• Kafka与Flink的整合 -- sink、source


    1、首先导入依赖:
    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-kafka</artifactId>
    4. <version>1.15.2</version>
    5. </dependency>
    2、 source:Flink从Kafka中读取数据
    1. public class Demo01KafkaSource {
    2. public static void main(String[] args) throws Exception{
    3. //构建环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //构建kafka source 环境
    6. KafkaSource source = KafkaSource.builder()
    7. //指定broker列表
    8. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    9. //指定topic
    10. .setTopics("bigdata")
    11. //指定消费组
    12. .setGroupId("my-group")
    13. //指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据
    14. .setStartingOffsets(OffsetsInitializer.earliest())
    15. //读取数据格式:
    16. .setValueOnlyDeserializer(new SimpleStringSchema())
    17. .build();
    18. //使用kafka数据源
    19. DataStreamSource kafkaSourceDS = env.
    20. fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    21. kafkaSourceDS.print();
    22. //启动flink
    23. env.execute();
    24. }
    25. }
            启动生产kafka:
    kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
    3、sink:Flink向Kafka中写入数据
    1. public class Demo02KafkaSink {
    2. public static void main(String[] args) throws Exception{
    3. //构建flink的环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //读取数据文件:
    6. DataStreamSource studentDS = env.readTextFile("flink/data/students.txt");
    7. //创建kafka sink
    8. KafkaSink sink = KafkaSink.builder()
    9. //指定flink broker列表
    10. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    11. //指定数据的格式:
    12. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    13. //指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic
    14. .setTopic("student")
    15. //指定数据的格式
    16. .setValueSerializationSchema(new SimpleStringSchema())
    17. .build()
    18. )
    19. //指定数据处理的语义:
    20. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    21. .build();
    22. //执行flink
    23. studentDS.sinkTo(sink);
    24. //构建flink环境
    25. env.execute();
    26. }
    27. }
            启动消费kafka:
    kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

  • 相关阅读:
    基于window10的远程桌面报错:要求的函数不受支持 的问题解决方法
    Yolo v8中的上下文管理器
    使用 KubeSphere 部署高可用 RocketMQ 集群
    VMware vSphere虚拟机Windows重置系统密码方法总结
    基于Redis+Cookie实现Session共享
    十年架构五年生活-07 年轻气盛的蜕变
    设计模式之责任链模式
    js 定时器 setInterval(图片的自动变换)
    Typora如何把图片上传到图床smms.app
    谷粒学苑项目前台界面 (一)
  • 原文地址:https://blog.csdn.net/m0_62078954/article/details/134274702