• Flink Kafka获取数据写入到MongoDB中 样例


    简述

    Apache Flink 是一个流处理和批处理的开源框架,它允许从各种数据源(如 Kafka)读取数据,处理数据,然后将数据写入到不同的目标系统(如 MongoDB)。以下是一个简化的流程,描述如何使用 Flink 从 Kafka 读取数据并保存到 MongoDB:

    1、环境准备

    • 安装并配置 Apache Flink。
    • 安装并配置 Apache Kafka。
    • 安装并配置 MongoDB。
    • 创建一个 Kafka 主题,并发送一些测试数据。
    • 确保 Flink 可以连接到 Kafka 和 MongoDB。

    部署参考:
    1、flink:Flink 部署执行模式
    2、kafka:Flink mongo & Kafka
    3、mongoDb:mongo副本集本地部署

    2. 添加依赖

    在Flink 项目中,需要添加 Kafka 和 MongoDB 的连接器依赖。对于 Maven 项目,可以在 pom.xml 文件中添加相应的依赖。
    对于 Kafka,需要添加 Flink Kafka Connector 的依赖。
    对于 MongoDB,需要添加 Flink MongoDB Sink 的依赖。

    3. 编写 Flink 作业

    * 创建一个 Flink 作业,使用 Flink 的 `FlinkKafkaConsumer` 从 Kafka 主题中读取数据。  
    * 对读取的数据进行必要的转换或处理。  
    * 使用 MongoDB 的 Java 驱动程序或第三方库将处理后的数据写入 MongoDB。
    

    4. 运行 Flink 作业

    使用 Flink 的命令行工具或 IDE 运行 Flink 作业。确保 Kafka 和 MongoDB 正在运行,并且 Flink 可以访问它们。

    参考:Flink 命令行提交、展示和取消作业

    5. 监控和调试

    使用 Flink 的 Web UI 或其他监控工具来监控作业。如果出现问题,检查日志并进行调试。

    6. 优化和扩展

    根据需求和数据量,优化 Flink 作业的性能和可扩展性。这可能包括调整并行度、增加资源、优化数据处理逻辑等。

    代码

    package com.wfg.flink.connector.kafka;
    
    import com.mongodb.client.model.InsertOneModel;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.connector.mongodb.sink.MongoSink;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.bson.BsonDocument;
    
    import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
    import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;
    
    /**
     * @author wfg
     */
    public class KafkaToWriteMongo {
        public static void main(String[] args) throws Exception {
            // 1. 设置 Flink 执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            KafkaSource<String> source = KafkaSource.<String>builder()
                    .setBootstrapServers(KAFKA_BROKERS)
                    .setTopics(TEST_TOPIC_PV)
                    .setGroupId("my-test-topic-pv")
                    .setStartingOffsets(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .build();
    
            DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
            // 创建RollingFileSink
            MongoSink<String> sink = MongoSink.<String>builder()
                    .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
                    .setDatabase("sjzz")
                    .setCollection("TestMongoPv")
                    .setMaxRetries(3)
    //                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .setSerializationSchema(
                            (input, context) -> {
                                System.out.println(input);
                                return new InsertOneModel<>(BsonDocument.parse(input));
                            })
                    .build();
            rs.sinkTo(sink);
            // 6. 执行 Flink 作业
            env.execute("Kafka Flink Job");
        }
    }
    
    
  • 相关阅读:
    JS函数function
    视频超分之BasicVSR++阅读笔记
    从github上下载下来的代码下载依赖提示: An unknown git error occurred
    刷题记录:牛客NC24263[USACO 2018 Feb G]Directory Traversal
    设计模式 23 访问者模式
    一文搞定垃圾回收器
    模板:斯坦纳树
    Oracle 19C 静默安装 GoldenGate
    【MySQL 数据库】9、存储过程
    ThinkPHP3.2.3反序列化链子分析
  • 原文地址:https://blog.csdn.net/mqiqe/article/details/139781853