• Kafka与Flink的整合 -- sink、source


    1、首先导入依赖:
    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-kafka</artifactId>
    4. <version>1.15.2</version>
    5. </dependency>
    2、 source:Flink从Kafka中读取数据
    1. public class Demo01KafkaSource {
    2. public static void main(String[] args) throws Exception{
    3. //构建环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //构建kafka source 环境
    6. KafkaSource source = KafkaSource.builder()
    7. //指定broker列表
    8. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    9. //指定topic
    10. .setTopics("bigdata")
    11. //指定消费组
    12. .setGroupId("my-group")
    13. //指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据
    14. .setStartingOffsets(OffsetsInitializer.earliest())
    15. //读取数据格式:
    16. .setValueOnlyDeserializer(new SimpleStringSchema())
    17. .build();
    18. //使用kafka数据源
    19. DataStreamSource kafkaSourceDS = env.
    20. fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    21. kafkaSourceDS.print();
    22. //启动flink
    23. env.execute();
    24. }
    25. }
            启动生产kafka:
    kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
    3、sink:Flink向Kafka中写入数据
    1. public class Demo02KafkaSink {
    2. public static void main(String[] args) throws Exception{
    3. //构建flink的环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //读取数据文件:
    6. DataStreamSource studentDS = env.readTextFile("flink/data/students.txt");
    7. //创建kafka sink
    8. KafkaSink sink = KafkaSink.builder()
    9. //指定flink broker列表
    10. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    11. //指定数据的格式:
    12. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    13. //指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic
    14. .setTopic("student")
    15. //指定数据的格式
    16. .setValueSerializationSchema(new SimpleStringSchema())
    17. .build()
    18. )
    19. //指定数据处理的语义:
    20. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    21. .build();
    22. //执行flink
    23. studentDS.sinkTo(sink);
    24. //构建flink环境
    25. env.execute();
    26. }
    27. }
            启动消费kafka:
    kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

  • 相关阅读:
    1.R语言介绍
    隐私计算头条周刊(9.11-9.17)
    软件开发的一般步骤
    208.实现Trie(前缀树)
    总结了几个做用户体验设计的原则,分享给需要的朋友
    DPDK网卡RSS(receive side scaling)简介
    在 JavaScript 中,什么时候使用 Map 或胜过 Object
    【Git】GitHub 操作
    25个Matplotlib图的Python代码,复制直接可用
    [datawhale202211]跨模态神经搜索实践:环境配置
  • 原文地址:https://blog.csdn.net/m0_62078954/article/details/134274702