• flink读取mongodb数据源


    一、普通java模式获取

    1. mongodb-driver驱动

    mongodb-driver是mongo官方推出的java连接mongoDB的驱动包,相当于JDBC驱动。
    (1)通过maven仓库导入:https://mvnrepository.com/artifact/org.mongodb/mongodb-driver
    (2)官网中下载相应的java的驱动:http://docs.mongodb.org/ecosystem/drivers/java/
    (3)不同的驱动使用的jar也不相同参考:http://mongodb.github.io/mongo-java-driver/
    例如:

     <dependencies>
        <dependency>
            <groupId>org.mongodbgroupId>
            <artifactId>mongodb-driver-syncartifactId>
            <version>3.11.2version>
        dependency>
      dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2. 创建方法类

    2.1 查询全部,遍历打印

    package mongodb.test;
    
    import org.bson.Document;
    import com.mongodb.BasicDBObject;
    import com.mongodb.MongoClient;
    import com.mongodb.client.FindIterable;
    import com.mongodb.client.MongoCollection;
    import com.mongodb.client.MongoDatabase;
    
    public class Mongodb {
        
        /**
         * 查询打印全部集合
         */
        public static void mongoQueryAll() {
            //1.创建链接
            MongoClient client = new MongoClient("localhost");
            //2.打开数据库test
            MongoDatabase db = client.getDatabase("test");
            //3.获取集合
            MongoCollection<Document> collection = db.getCollection("stu");
            //4.查询获取文档集合
            FindIterable<Document> documents = collection.find();
            //5.循环遍历
            for (Document document : documents) {
                System.out.println(document);
            }
            //6.关闭连接
            client.close();
        }
        
        public static void main(String[] args) {
            mongoQueryAll();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    //打印输出stu全部数据
    Document{{_id=5d7374e836a89c5a3d18b87a, name=xiaohua}}
    Document{{_id=2.0, sn=002, name=xiaogang}}
    Document{{_id=3.0, sn=003, name=zhangfei, job=前锋战将}}
    Document{{_id=5d73782736a89c5a3d18b87b, sn=004, name=xiaobingbing}}
    Document{{_id=5d7396b44ec120618b2dd0cb, name=Document{{surname=, name=世名}}, job=[皇帝, 大人物, 大丈夫, 功成名就]}}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.2 条件查询

    
     	/**
         * 条件查询:如查询id为xxxx的学生所有信息
         */
        public static void mongoConditionQuery() {
            //1.创建链接
            MongoClient client = new MongoClient("localhost");
            //2.打开数据库test
            MongoDatabase db = client.getDatabase("test");
            //3.获取集合
            MongoCollection<Document> collection = db.getCollection("stu");
            //4.构建查询条件,按照name来查询
            BasicDBObject stu = new BasicDBObject("name","zhangfei");
            //5.通过id查询记录,获取文档集合
            FindIterable<Document> documents = collection.find(stu);
            //5.打印信息
            for (Document document : documents) {
                System.out.println("name:"+document.getString("name"));
                System.out.println("sn:"+document.getString("sn"));
                System.out.println("job:"+document.getString("job"));
            }
            //6.关闭连接
            client.close();
        }
        
        public static void main(String[] args) {
            mongoConditionQuery();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    //执行输出
    name:zhangfei
    sn:003
    job:前锋战将
    
    • 1
    • 2
    • 3
    • 4

    注意:当需要查询条件+判断的时候这样写,例如查询学号sn>003的学员

    //查询sum大于3的学员
    BasicDBObject stu = new BasicDBObject("sum",new BasicDBObject("$gt",003));
    
    • 1
    • 2

    2.3 插入语句

     	 /**
         * 插入语句
         */
        public static void mongoInsert() {
            //1.创建链接
            MongoClient client = new MongoClient("localhost");
            //2.打开数据库test
            MongoDatabase db = client.getDatabase("test");
            //3.获取集合
            MongoCollection<Document> collection = db.getCollection("stu");
            //4.准备插入数据
            HashMap<String, Object> map = new HashMap<String, Object>();
            map.put("sn","005");
            map.put("name","xiaoA");
            map.put("job","A工作");
            map.put("sum",6);
            //5.将map转换成document
            Document document = new Document(map);
            collection.insertOne(document);
            //6.关闭连接
            client.close();
        }
        //测试执行
        public static void main(String[] args) {
            mongoInsert();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    批量插入,仅供参考:

    //当需要插入多条文档的时候,循环进行单条插入当然是可以,但是效率不高,MongoDB提供了批量插入的方法
    List<DBObject> objs = new ArrayList<DBObject>();
    objs.add(new BasicDBObject("name","user29").append("age", 30).append("sex", 1));
    objs.add(new BasicDBObject("name","user30").append("age", 30).append("sex", 1));
    collection.insert(objs);
    //这样就批量进行了插入。批量插入通过一次请求将数据传递给数据库,然后由数据库进行插入,比循环单条插入节省了每次进行请求的资源。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    二、Flink 以Hadoop文件格式读取

    1. pom.xml添加相关依赖

    
    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-hadoop-compatibility_2.11artifactId>
        <version>1.6.0version>
    dependency>
    <dependency>
        <groupId>org.mongodb.mongo-hadoopgroupId>
        <artifactId>mongo-hadoop-coreartifactId>
        <version>2.0.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2. 以Hadoop文件格式读取MongoDB中的数据

    import com.mongodb.hadoop.MongoInputFormat;
    import com.mongodb.hadoop.MongoOutputFormat;
    import com.mongodb.hadoop.io.BSONWritable;
    import example.flink.KeySelector.RecordSeclectId;
    import example.flink.mapFunction.BSONMapToRecord;
    import example.flink.reduceFunction.KeyedGroupReduce;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
    import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.hadoop.mapreduce.Job;
    import org.bson.BSONObject;
     
    public class MongoSet {
    	public static void main(String[] args) throws Exception {
    	    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    	    env.setParallelism(4);
            Job inputJob = Job.getInstance();
            //inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection");
            //inputJob.getConfiguration().set("mongo.auth.uri", "mongodb://root:rootpw@mongos01:port,mongos02:port,mongos03:port/admin");
               
            inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:readpw@mongos01:port,mongos02:port,mongos03:port/db.collection?&authMechanism=SCRAM-SHA-1&authSource=admin&readPreference=secondary");
            inputJob.getConfiguration().set("mongo.input.split.read_shard_chunks", "true");
            inputJob.getConfiguration().set("mongo.input.split.create_input_splits", "false");
            inputJob.getConfiguration().set("mongo.input.split_size","16");
            inputJob.getConfiguration().set("mongo.input.query", "{'createDateTime': {\"$lte\":{\"$date\":\"2019-05-27T00:00:00.000Z\"}, \"$gte\":{\"$date\":\"2010-03-17T00:00:00.000Z\"}}}");
            inputJob.getConfiguration().set("mongo.input.fields", "{\"Id\":\"1\",\"saleType\":\"1\",\"saleNum\":\"1\",\"createDateTime\":\"1\"}");
     
            HadoopInputFormat<Object, BSONObject> hdIf =
    				new HadoopInputFormat<>(new MongoInputFormat(), Object.class, BSONObject.class, inputJob);
     
    	    DataSet<Tuple2<Object, BSONObject>> inputNew = env.createInput(hdIf);
     
    	    DataSet<Tuple2<String, BSONWritable>> personInfoDataSet = inputNew
    				.map(new BSONMapToRecord())
    				.groupBy(new RecordSeclectId())
    				.reduceGroup(new KeyedGroupReduce());
     
    	    Job outputJob = Job.getInstance();
    	    outputJob.getConfiguration().set("mongo.output.uri", "mongodb://mongo:27017/db.collection");
    	    outputJob.getConfiguration().set("mongo.output.batch.size", "8");
    	    outputJob.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
    	    personInfoDataSet.output(new HadoopOutputFormat<>(new MongoOutputFormat<>(), outputJob));
     
    	    env.execute(MongoSet.class.getCanonicalName());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    三、Flink CDC监控MongoDB oplog的变化(只能同步实时数据)

    1、简介

    MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。

    Flink CDC官网:https://github.com/ververica/flink-cdc-connectors
    MongoDB CDC:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md
    mongodb知识点整理:https://blog.csdn.net/penngo/article/details/124232016

    2、依赖条件

    • MongoDB版本
      MongoDB version >= 3.6

    • 集群部署
      副本集 或 分片集群 。

    • Storage Engine
      WiredTiger存储引擎。

    • 副本集协议版本
      副本集协议版本1 (pv1) 。
      从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。

    • 需要的权限
      MongoDB Kafka Connector需要changeStream 和 read 权限。
      您可以使用下面的示例进行简单授权:
      更多详细授权请参考MongoDB数据库用户角色。

    use admin;
    db.createUser({
          
      user: "flinkuser",
      pwd: "flinkpw",
      roles: [
        {
           role: "read", db: "admin" }, //read role includes changeStream privilege 
        {
           role: "readAnyDatabase", db: "admin" } //for snapshot reading
      ]
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3、配置MongoDB副本集

    创建mongo1.conf、mongo2.conf、mongo3.conf

    # mongo1.conf
    dbpath=/data/mongodb-4.4.13/data1
    logpath=/data/mongodb-4.4.13/mongo1.log
    logappend=true
    port=27017
    replSet=replicaSet_penngo  # 副本集名称
    oplogSize=200
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # mongo2.conf
    dbpath=/data/mongodb-4.4.13/data2
    logpath=/data/mongodb-4.4.13/mongo2.log
    logappend=true
    port=27018
    replSet=replicaSet_penngo  # 副本集名称
    oplogSize=200
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    # mongo3.conf
    dbpath=/data/mongodb-4.4.13/data3
    logpath=/data/mongodb-4.4.13/mongo3.log
    logappend=true
    port=27019
    replSet=replicaSet_penngo  # 副本集名称
    oplogSize=200
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    启动mongodb服务端
    在单独的终端上分别运行以下命令:

    > mongod --config ../mongo1.conf
    > mongod --config ../mongo2.conf
    > mongod --config ../mongo3.conf
    
    • 1
    • 2
    • 3

    连接mongodb,使用mongo shell配置副本集

    > mongo --port 27017
    # 在mongo shell中执行下边命令初始化副本集
    > rsconf = {
     _id: "replicaSet_penngo",
     members: [
     {_id: 0, host: "localhost:27017"},
     {_id: 1, host: "localhost:27018"},
     {_id: 2, host: "localhost:27019"}
     ]
     }
    > rs.initiate(rsconf)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    mongo shell中创建数据库penngo_db和集合coll,插入1000条数据

    > use penngo_db
    > for (i=0; i<1000; i++) {db.coll.insert({user: "penngo" + i})}
    > db.coll.count()
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    在mongo shell创建新用户,给Flink MongoDB CDC使用

    > use admin;
    > db.createUser({
      user: "flinkuser",
      pwd: "flinkpw",
      roles: [
        { role: "read", db: "admin" }, //read role includes changeStream privilege 
        { role: "readAnyDatabase", db: "admin" } //for snapshot reading
      ]
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    4、创建maven工程

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0modelVersion>
      <groupId>com.penngo.flinkcdcgroupId>
      <artifactId>FlickCDCartifactId>
      <packaging>jarpackaging>
      <version>1.0-SNAPSHOTversion>
      <name>FlickCDC_TESTname>
      <url>https://21doc.net/url>
      <properties>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
        <maven.compiler.source>11maven.compiler.source>
        <maven.compiler.target>11maven.compiler.target>
        <flink-version>1.13.3flink-version>
        <flink-cdc-version>2.1.1flink-cdc-version>
      properties>
      <dependencies>
        <dependency>
          <groupId>junitgroupId>
          <artifactId>junitartifactId>
          <version>3.8.1version>
          <scope>testscope>
        dependency>
        <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-javaartifactId>
          <version>${flink-version}version>
        dependency>
        <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-connector-baseartifactId>
          <version>${flink-version}version>
        dependency>
    
        <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-streaming-java_2.12artifactId>
          <version>${flink-version}version>
        dependency>
        <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-clients_2.12artifactId>
          <version>${flink-version}version>
        dependency>
    
        <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-table-commonartifactId>
          <version>${flink-version}version>
        dependency>
    
        <dependency>
          <groupId>com.ververicagroupId>
          <artifactId>flink-connector-mysql-cdcartifactId>
          <version>${flink-cdc-version}version>
        dependency>
        <dependency>
          <groupId>com.ververicagroupId>
          <artifactId>flink-connector-mongodb-cdcartifactId>
          <version>${flink-cdc-version}version>
        dependency>
      dependencies>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-compiler-pluginartifactId>
            <version>3.8.1version>
            <configuration>
              <source>${maven.compiler.source}source>
              <target>${maven.compiler.target}target>
              <encoding>${project.build.sourceEncoding}encoding>
            configuration>
          plugin>
        plugins>
      build>
      <repositories>
        <repository>
          <id>alimavenid>
          <name>Maven Aliyun Mirrorname>
          <url>https://maven.aliyun.com/repository/centralurl>
        repository>
      repositories>
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    MongoDBExample.java

    package com.penngo.flinkcdc;
    
    import com.ververica.cdc.connectors.mongodb.MongoDBSource;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.commons.lang3.StringEscapeUtils;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.util.Collector;
    
    public class MongoDBExample {
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            //2.通过FlinkCDC构建SourceFunction
            SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
                    .hosts("127.0.0.1:27017")
                    .username("flinkuser")
                    .password("flinkpw")
                    .database("penngo_db")
                    .collection("coll")
    //                .databaseList("penngo_db")
    //                .collectionList("coll")
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    .build();
    
            DataStreamSource<String> dataStreamSource = env.addSource(mongoDBSourceFunction);
    
            SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
                @Override
                public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
                    try {
                        System.out.println("processElement=====" + value);
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            dataStreamSource.print("原始流--");
            env.execute("Mongo");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    运行效果

    四、Flink SQL CDC 监控MongoDB

  • 相关阅读:
    Fiddler简单使用手册
    面试题 Android 如何实现自定义View 固定帧率绘制
    实验四-----数据库
    如何调试 Python 代码
    5G面试题目和答案,计算机面试
    业界主流数据加速技术路线
    记录下配置腾讯云服务器的过程
    Guava限流器原理浅析
    javaweb登录注册页面页面的完整代码
    【软件工程与实践】(第四版)第6章习题答案详解
  • 原文地址:https://blog.csdn.net/lck_csdn/article/details/126122730