• Flink的简单学习(kafka)三


    一 Kafka的介绍

    1.kafka是一个高吞吐的分布式消息系统,是一个消息队列

    2.生产者负责生产数据 ,消费者负责消费数据

    3.特点:

    生存者消费者模型,FIFO

    高性能:单节点支持上千个客户端,百MB/s吞吐

    持久性:消息直接持久化在普通磁盘上且性能好

    分布式:数据副本冗余、流量负载均衡、可扩展

    很灵活:消息长时间持久化+Client维护消费状态

    4.性能好的原因

    kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写

    kafka还用了sendFile的0拷贝技术,提高速度

    而且还用到了批量读写,一批批往里写,64K为单位

    二 Kafka的搭建

    2.1 上传解压修改环境变量

    1. # 解压
    2. tar -zxvf kafka_2.11-1.0.0.tgz -C ../
    3. mv kafka_2.11-1.0.0 kafka-1.0.0
    4. # 配置环境变量
    5. vim /etc/profile
    6. export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
    7. export PATH=$PATH:$KAFKA_HOME/bin
    8. source /etc/profile

    2.2 修改配置文件

    1.修改config目录下的server.properties文件

    1. broker.id=0 每一个节点broker.id 要不一样
    2. zookeeper.connect=master:2181,node1:2181,node2:2181/kafka
    3. log.dirs=/usr/local/soft/kafka-1.0.0/data 数据存放的位置

    2.3 同步

    1. # 同步kafka文件
    2. scp -r kafka-1.0.0/ node1:`pwd`
    3. scp -r kafka-1.0.0/ node2:`pwd`
    4. # 将master中的而环境变量同步到node1和node2
    5. scp /etc/profile node1:/etc/
    6. scp /etc/profile node2:/etc/
    7. # 在ndoe1和node2中执行source
    8. source /etc/profile

    2.4 修改其他节点的文件

    1. # node1
    2. broker.id=1
    3. # node2
    4. broker.id=2

    2.5 启动kafka

     1、需要启动zookeeper,  kafka使用zk保存元数据
     需要在每隔节点中执行启动的命令
     zkServer.sh start

    2.每个节点中都要启动(去中心化的架构)
    # -daemon后台启动

    /usr/local/soft/kafka-1.0.0/config/server.properties:配置文件的路径要写全
    kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties

    3.测试 

    任意一个节点输入

    kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic shujia

    topic shujia:只是名字而已可以随便取

    另一个节点输入

    kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic shujia

    但是主要最后这个节点后面的取名也跟前面的一样

    4.数据存放在data目录下 但是不知道是哪个节点下的 

    5.默认生产者产生的数据可以保存7天,消费者可以在这7天内使用这些数据,7天一过,数据自动销毁

    三 Kafka的架构

    3.1 基本概念

    1.producer:消息生存者

    2.consumer:消息消费者

    3.broker:kafka集群的server:负责处理消息读、写请求,存储消息

    4.topic:消息队列/分类

    5.broker就是代理,在kafka cluster这一层这里,其实里面是有很多个broker

    6.topic就相当于queue

    3.2 消息存储和生产消费模型

    1.一个topic分成多个partition。一个topic可以看成一张表,被分成多个分区。一个partition可以看作一个并行度。

    2.每个partition内部消息强有序,其中的每个消息都有一个序号叫offset。每个分区内部的数据有有序的,先进先出,但是不同的partition里面的数据出来的顺序是随机的。

    3.一个partition只对应一个broker,一个broker可以管多个partition。

    4.消息不经过内存缓冲,直接写入文件。

    5.根据时间策略删除,而不是消费完就删除

    6.producer自己决定往哪个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略

    7.consumer自己维护消费到哪个offset

    8.每个consumer都有对应的group

    9.group内是queue消费模型 :

            各个consumer消费不同的partition

            因此一个消息在group内只消费一次

    10.group间是publish-subscribe消费模型

            各个group各自独立消费,互不影响

            因此一个消息在被每个group消费一次

     

    3.3 使用kafka

    3.3.1  创建topic

    1.输入命令

    kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic bigdata

    --replication-factor  ---每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量
    --partition   --分区数,  根据数据量设置
    --zookeeper zk的地址,将topic的元数据保存在zookeeper中

    2.在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic

    3.3.2 查看topic描述信息

    1.输入命令

    kafka-topics.sh --describe  --zookeeper master:2181,node1:2181,node2:2181/kafka --topic bigdata

    partition的数字是分区编号,leader后面的数字是节点,后面是存储的分区编号,但是不是匹配的,这里的leader0的分区编号是0与1,leader1的分区编号是1与2,leader2的分区是0与2.

    3.3.3 获取所有topic

    kafka-topics.sh --list  --zookeeper  master:2181,node1:2181,node2:2181/kafka

    2.__consumer_offsetsL kafka用于保存消费便宜量的topic

    3.3.4 创建控制台生产者

    kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata

    3.3.5 创建控制台消费者

    kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

    3.3.6 数据保存的方式

    1. # 1、保存的文件
    2. /usr/local/soft/kafka_2.11-1.0.0/data
    3. # 2,每一个分区每一个副本对应一个目录
    4. # 3、每一个分区目录中可以有多个文件, 文件时滚动生成的
    5. 00000000000000000000.log
    6. 00000000000000000001.log
    7. 00000000000000000002.log
    8. # 4、滚动生成文件的策略
    9. log.segment.bytes=1073741824
    10. log.retention.check.interval.ms=300000
    11. # 5、文件删除的策略,默认时7天,以文件为单位删除
    12. log.retention.hours=168

    3.3.7 删除topic

    1.先在配置文件vim config/server.properties加一行代码delete.topic.enable=true

    再执行命令:

    kafka-topics.sh --delete --topic students_hash --zookeeper master:2181,node1:2181,node2:2181/kafka

     四 JavaAPI

    4.1 producer

    1.先需要创建Properties对象,并设置broker列表以及kv的数据类型

    1. Properties properties = new Properties();
    2. //指定broker列表
    3. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
    4. //指定keyvalue的数据格式
    5. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    6. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    2.创建KafkaProducer对象,泛型是上面kv的数据类型,里面传入Properties的对象

     Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    

    3.生产数据:使用KafkaProducer对象中的send方法,里面传入ProducerRecord的对象。前面是topic,后面是数据

    producer.send(new ProducerRecord<>("words","java"));
    

    4.刷新并关闭

    5.查看去命令行查看,相关命令去看使用Kafka

    6.全部代码

    1. package com.shujia.kafka;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.Producer;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import java.io.BufferedReader;
    6. import java.io.FileReader;
    7. import java.util.Properties;
    8. public class Demo2StudentToKafka {
    9. public static void main(String[] args)throws Exception {
    10. Properties properties = new Properties();
    11. //指定broker列表
    12. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
    13. //指定key和value的数据格式
    14. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    15. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    16. Producer producer = new KafkaProducer(properties);
    17. BufferedReader br = new BufferedReader(new FileReader("flink/data/students.csv"));
    18. String line =null;
    19. while ((line=br.readLine())!=null){
    20. producer.send(new ProducerRecord<>("students",line));
    21. producer.flush();
    22. }
    23. br.close();
    24. producer.close();
    25. }
    26. }

     7.hash分区代码

    这个分区是先在命令行中已经分好的,然后通过某一个字段的哈希值值分组

    kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash

    1. package com.shujia.kafka;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.Producer;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import java.io.BufferedReader;
    6. import java.io.FileReader;
    7. import java.util.Properties;
    8. public class Demo4StudentToKafka {
    9. public static void main(String[] args)throws Exception {
    10. Properties properties = new Properties();
    11. //指定broker列表
    12. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
    13. //指定keyvalue的数据格式
    14. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    15. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    16. Producer<String, String> producer = new KafkaProducer<String, String>(properties);
    17. BufferedReader br = new BufferedReader(new FileReader("flink/data/students.csv"));
    18. String line =null;
    19. while ((line=br.readLine())!=null){
    20. //通过hash分区,hash分区就是将那个字段的hash值对分区数取余
    21. String clazz = line.split(",")[4];
    22. int partition = Math.abs(clazz.hashCode()) % 3;
    23. //kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash
    24. //ProducerRecord(String topic, Integer partition, K key, V value)
    25. producer.send(new ProducerRecord<>("students_hash",partition,null,line));
    26. producer.flush();
    27. }
    28. br.close();
    29. producer.close();
    30. }
    31. }

    4.2 consume

    1. 先需要创建Properties对象,并设置broker列表,kv的数据类型,读取的方式,以及指定消费者组

    其中的earliest是全局消费,latest是提交一次offset消费一次,如果不指定,默认是latest

    1. Properties properties = new Properties();
    2. //kafka 集群列表
    3. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
    4. //读取数据的格式
    5. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    6. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    7. /*
    8. * earliest
    9. * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    10. * latest 默认
    11. * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
    12. * none
    13. * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    14. *
    15. */
    16. properties.setProperty("auto.offset.reset", "earliest");
    17. //指定消费者组,一条数据在一个组内只消费一次
    18. properties.setProperty("group.id", "asdsada");

    2.创建KafkaConsume对象,泛型是上面kv的数据类型,里面传入Properties的对象

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    

    3.订阅topic,封装成一个集合。再使用KafkaConsume对象的subscribe方法,里面传入的是集合

    1. //订阅topic
    2. ArrayList<String> topics = new ArrayList<>();
    3. topics.add("students");
    4. //subscribe方法里面传的是一个集合,所以要把topic封装成一个集合对象
    5. consumer.subscribe(topics);

    4.设置消费的间隔时间,获取一个迭代器

            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
    

    5.遍历迭代器获取里面的元素,包括topic,partition,offset,value,timestamp等等

    1. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    2. String topic = consumerRecord.topic();
    3. int partition = consumerRecord.partition();
    4. long offset = consumerRecord.offset();
    5. String value = consumerRecord.value();
    6. long timestamp = consumerRecord.timestamp();
    7. System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);
    8. }

    6.一般一组只能取500个,想获取完要么改分组名,要么改成死循环,让他一直消费,只要有数据产生,就能一直消费

    7.一直取的代码

    1. package com.shujia.kafka;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import org.apache.kafka.clients.consumer.KafkaConsumer;
    5. import java.util.ArrayList;
    6. import java.util.Properties;
    7. public class Demo3Consumer {
    8. public static void main(String[] args) {
    9. Properties properties = new Properties();
    10. //kafka 集群列表
    11. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
    12. //读取数据的格式
    13. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    14. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    15. /*
    16. * earliest
    17. * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    18. * latest 默认
    19. * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
    20. * none
    21. * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    22. *
    23. */
    24. properties.setProperty("auto.offset.reset", "earliest");
    25. //指定消费者组,一条数据在一个组内只消费一次
    26. properties.setProperty("group.id", "asdsada");
    27. //创建消费者
    28. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    29. //订阅topic
    30. ArrayList<String> topics = new ArrayList<>();
    31. topics.add("students");
    32. //subscribe方法里面传的是一个集合,所以要把topic封装成一个集合对象
    33. consumer.subscribe(topics);
    34. while (true){
    35. ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
    36. for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    37. String topic = consumerRecord.topic();
    38. int partition = consumerRecord.partition();
    39. long offset = consumerRecord.offset();
    40. String value = consumerRecord.value();
    41. long timestamp = consumerRecord.timestamp();
    42. System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);
    43. }
    44. }
    45. }
    46. }

    五 Flink On Kafka

    5.1 kafka source

    1.见代码

    1. package com.shujia.flink.kafka;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    4. import org.apache.flink.connector.kafka.source.KafkaSource;
    5. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. public class Demo1KafkaSource {
    9. public static void main(String[] args) throws Exception {
    10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. //创建kafka source
    12. KafkaSource<String> source = KafkaSource.<String>builder()
    13. .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
    14. .setTopics("students")//指定消费的topic
    15. // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
    16. //.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    17. // 从最早位点开始消费
    18. //.setStartingOffsets(OffsetsInitializer.earliest())
    19. // 从最末尾位点开始消费
    20. // .setStartingOffsets(OffsetsInitializer.latest())
    21. .setStartingOffsets(OffsetsInitializer.earliest())
    22. .setValueOnlyDeserializer(new SimpleStringSchema())//指定读取数据的格式
    23. .build();
    24. //使用kafka source
    25. DataStreamSource<String> studentsDS = env
    26. .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    27. studentsDS.print();
    28. env.execute();
    29. }
    30. }

    5.2 kafka sink

    1. package com.shujia.flink.kafka;
    2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    3. import org.apache.flink.connector.base.DeliveryGuarantee;
    4. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    5. import org.apache.flink.connector.kafka.sink.KafkaSink;
    6. import org.apache.flink.streaming.api.datastream.DataStream;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. public class Demo2KafkaSink {
    9. public static void main(String[] args) throws Exception{
    10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. DataStream<String> carsDS = env.readTextFile("flink/data/cars_sample.json");
    12. //创建kafka sink
    13. KafkaSink<String> sink = KafkaSink.<String>builder()
    14. .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
    15. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    16. .setTopic("cars")//指定topic
    17. .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
    18. .build()
    19. )
    20. //指定数据处理的语义
    21. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    22. .build();
    23. //使用kafka sink
    24. carsDS.sinkTo(sink);
    25. env.execute();
    26. }
    27. }

    5.3 代码打包到服务器运行

    1.导入依赖,将flink-sql-connector-kafka-1.15.2.jar放到flink/lib目录下

    2.执行命令随便选一个

    flink run-application -t yarn-application -c com.shujia.flink.kafka.Demo3Cars flink-1.0.jar 
     

  • 相关阅读:
    常用git命令
    盲盒电商平台商业玩法解析
    3.7 C++高级编程_自己实现智能指针
    C++流程控制总结,看这一篇就够了
    2022年2000元能玩原神的手机推荐 这3款值得买
    【iMessage苹果推日历推位置推送】软件安装 UIApplication 的 registerForRemoteNotifications
    编译器一日一练(DIY系列之四则运算)
    【数据结构】堆(万字详解)
    Fabric.js 变换视窗
    redis中value/SortedSet
  • 原文地址:https://blog.csdn.net/weixin_65909965/article/details/139424354