• Kafka与Flink的整合 -- sink、source


    1、首先导入依赖:
    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-connector-kafka</artifactId>
    4. <version>1.15.2</version>
    5. </dependency>
    2、 source:Flink从Kafka中读取数据
    1. public class Demo01KafkaSource {
    2. public static void main(String[] args) throws Exception{
    3. //构建环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //构建kafka source 环境
    6. KafkaSource source = KafkaSource.builder()
    7. //指定broker列表
    8. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    9. //指定topic
    10. .setTopics("bigdata")
    11. //指定消费组
    12. .setGroupId("my-group")
    13. //指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据
    14. .setStartingOffsets(OffsetsInitializer.earliest())
    15. //读取数据格式:
    16. .setValueOnlyDeserializer(new SimpleStringSchema())
    17. .build();
    18. //使用kafka数据源
    19. DataStreamSource kafkaSourceDS = env.
    20. fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    21. kafkaSourceDS.print();
    22. //启动flink
    23. env.execute();
    24. }
    25. }
            启动生产kafka:
    kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
    3、sink:Flink向Kafka中写入数据
    1. public class Demo02KafkaSink {
    2. public static void main(String[] args) throws Exception{
    3. //构建flink的环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. //读取数据文件:
    6. DataStreamSource studentDS = env.readTextFile("flink/data/students.txt");
    7. //创建kafka sink
    8. KafkaSink sink = KafkaSink.builder()
    9. //指定flink broker列表
    10. .setBootstrapServers("master:9092,node1:9092,node2:9092")
    11. //指定数据的格式:
    12. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    13. //指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic
    14. .setTopic("student")
    15. //指定数据的格式
    16. .setValueSerializationSchema(new SimpleStringSchema())
    17. .build()
    18. )
    19. //指定数据处理的语义:
    20. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    21. .build();
    22. //执行flink
    23. studentDS.sinkTo(sink);
    24. //构建flink环境
    25. env.execute();
    26. }
    27. }
            启动消费kafka:
    kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

  • 相关阅读:
    车载软件架构 —— AUTOSAR Vector SIP包(三)
    JMeter适合做接口测试吗?
    使用react-amanda快速搭建管理类型的系统
    开发了一个在线客服系统
    python数据容器
    LEO天线,全球市场总体规模,头部前八大厂商排名及市场份额
    vulhub中GitLab 任意文件读取漏洞复现(CVE-2016-9086)
    【无标题】
    文心一言 VS 讯飞星火 VS chatgpt (103)-- 算法导论10.1 1题
    Shell编程之免交互
  • 原文地址:https://blog.csdn.net/m0_62078954/article/details/134274702