• MongoDB实时同步方案


    MongoDB实时同步方案

    方案一:使用开源组件MongoShake

    缺点:如果kafka加了ssl认证,MongoShake没有实现这个功能

    不能导入到pulsar

    MongoShake简介

    ​ MongoShake是一个以golang语言进行编写的通用的平台型服务,通过读取MongoDB集群的Oplog操作日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。日志可以提供很多场景化的应用,为此,我们在设计时就考虑了把MongoShake做成通用的平台型服务。通过操作日志,我们提供日志数据订阅消费PUB/SUB功能,可通过SDK、Kafka、MetaQ等方式灵活对接以适应不同场景(如日志订阅、数据中心同步、Cache异步淘汰等)。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。

    MongoShake架构

    在这里插入图片描述

    MongoShake快速上手

    github仓库地址:https://github.com/alibaba/MongoShake

    把数据从mongodb实时导入到kafka

    目录结构

    mongoshake
    ├── ChangeLog
    ├── collector.conf
    ├── collector.conf.db2db
    ├── collector.darwin
    ├── collector.linux
    ├── collector.windows
    ├── comparison.py
    ├── diagnostic
    │ └── mongoshake.testrs.journal
    ├── hypervisor
    ├── logs
    │ ├── collector.log
    │ ├── collector.log.1
    │ ├── mongoshake.log
    │ ├── mongoshake.log.1
    │ ├── mongoshake.log.2
    │ ├── mongoshake.log.3
    │ ├── mongoshake.log.4
    │ ├── mongoshake.log.5
    │ ├── mongoshake.log.6
    │ ├── mongoshake.log.7
    │ ├── receiver.log
    │ └── receiver.log.1
    ├── mongoshake.pid
    ├── mongoshake-stat
    ├── receiver.conf
    ├── receiver.darwin
    ├── receiver.linux
    ├── receiver.windows
    ├── start.sh
    └── stop.sh

    在 MongoShake 中实现该功能比较简单, 只需要配置 collector(控制器,实现oplog数据的连接)

    如果是单机版的mongo数据库,需要配置oplog

    1.mongo server启动时添加–replSet single 参数

    服务重启后,第一次登入,执行其他命令时(例如:show dbs),会提示错误,错误信息如下:

    E QUERY [thread1] Error: listDatabases failed:{
    “ok” : 0,
    “errmsg” : “not master and slaveOk=false”,
    “code” : 13435,
    “codeName” : “NotMasterNoSlaveOk”
    }

    此时需要执行初始化命令:

    rs.initiate({ _id: “副本集名称”, members: [{_id:0,host:“服务器的IP:Mongo的端口号”}]})

    例如(对应上述配置):
    rs.initiate({ _id: “single”, members: [{_id:0,host:“192.168.144.249:27017”}]})

    执行完成后提示,代表执行成功:

    { “ok” : 1 }

    初始完副本集中唯一的节点,可能短时间显示为SECONDARY或OTHER。一般而言,稍等一会,就会自然恢复为primary,无需人工干预。

    rs:OTHER>
    rs:PRIMARY>
    rs:PRIMARY>

    查看local库里是否有oplog表

    collector.conf

    conf.version = 6
    # 增量获取
    sync_mode=incr
    #mongo_urls=mongodb://127.0.0.1:40001
    mongo_urls=mongodb://111.111.111.111:40001,111.111.111.112:40002,111.111.111.113:40003
     
    tunnel = kafka
    tunnel.address = mongo_shake_test@dn3.test.com:9092,dn4.test.com:9092,dn5.test.com:9092
    tunnel.message = json
    # 工作线程数,receiver.conf中的replayer oplog重放工作线程数应与此保持一致
    incr_sync.worker = 8
    incr_sync.mongo_fetch_method = oplog
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    执行./collector.linux -conf=collector.conf -verbose启动

    方案二:Flink CDC

    简介

    MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。

    依赖条件

    MongoDB版本
    MongoDB version >= 3.6

    集群部署
    副本集 或 分片集群

    Storage Engine
    WiredTiger存储引擎。

    副本集协议版本
    副本集协议版本1 (pv1) 。
    从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。

    需要的权限
    MongoDB Kafka Connector需要changeStream 和 read 权限。
    您可以使用下面的示例进行简单授权:
    更多详细授权请参考MongoDB数据库用户角色。

    use admin;
    db.createUser({
      user: "flinkuser",
      pwd: "flinkpw",
      roles: [
        { role: "read", db: "admin" }, //read role includes changeStream privilege 
        { role: "readAnyDatabase", db: "admin" } //for snapshot reading
      ]
    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    MongoDB 副本集和分片集

    副本集:高可用的部署模式,次要节点通过拷贝主要节点的操作日志来进行数据的复制。当主要节点发生故障时,次要节点和仲裁节点会重新发起投票来选出新的主要节点,实现故障转移。另外,次要节点还能分担查询请求,减轻主要节点的查询压力。

    分片集:水平扩展的部署模式,将数据均匀分散在不同 Shard 上,每个 Shard 可以部署为一个副本集,Shard 中主要节点承载读写请求,次要节点会复制主要节点的操作日志,能够根据指定的分片索引和分片策略将数据切分成多个 16MB 的数据块,并将这些数据块交给不同 Shard 进行存储。Config Servers 中会记录 Shard 和数据块的对应关系。

    MongoDB 的 Oplog 的限制

    MongoDB 的 Oplog 与 MySQL 的 Binlog 类似,记录了数据在 MongoDB 中所有的操作日志。Oplog 是一个有容量的集合,如果超出预设的容量范围,则会丢弃先前的信息。

    与 MySQL 的 Binlog 不同, Oplog 并不会记录变更前/后的完整信息。遍历 Oplog 的确可以捕获 MongoDB 的数据变更,但是想要转换成 Flink 支持的 Changelog 依然存在一些限制。

    首先,订阅 Oplog 难度较大。每个副本集会维护自己的 Oplog, 对于分片集群来说,每个 Shard 可能是一个独立的副本集,需要遍历每个 Shard 的 Oplog 并按照操作时间进行排序。另外, Oplog 没有包含变更文档前和变更后的完整状态,因此既不能转换成 Flink 标准的 Changelog ,也不能转换成 Upsert 类型的 Changelog 。这亦是我们在实现 MongoDB CDC Connector 的时候没有采用直接订阅 Oplog 方案的主要原因。

    最终选择使用 MongoDB Change Streams 方案来实现 MongoDB CDC Connector。

    Change Streams 是 MongoDB 3.6 版本提供的新特性,它提供了更简单的变更数据捕获接口,屏蔽了直接遍历 Oplog 的复杂度。Change Streams 还提供了变更后文档完整状态的提取功能,可以轻松转换成 Flink Upsert 类型的 Changelog。它还提供了比较完整的故障恢复能力,每一条变更记录数据都会包含一个 resume token 来记录当前变更流的位置。故障发生后,可以通过 resume token 从当前消费点进行恢复。

    另外, Change Streams 支持变更事件的筛选和定制化的功能。比如可以将数据库和集合名称的正则过滤器下推到 MongoDB 来完成,可以明显减少网络开销。它还提供了对集合库以及整个集群级别的变更订阅,能够支持相应的权限控制。

    方案三:手动监控oplog处理

    自己实现一个逻辑监控mongo的oplog日志,然后发送到目的端

    下面是一个监听的demo,这个逻辑可以在自定义Flink Souce的时候实现

    mongo_oplog_watcher.py

    import pymongo
    import re
    import time
    from pprint import pprint
    from pymongo.errors import AutoReconnect
    
    
    
    class OplogWatcher(object):
      def __init__(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True):
        if collection is not None:
          if db is None:
            raise ValueError('must specify db if you specify a collection')
          self._ns_filter = db + '.' + collection
        elif db is not None:
          self._ns_filter = re.compile(r'^%s\.' % db)
        else:
          self._ns_filter = None
    
        self.poll_time = poll_time
        self.connection = connection or pymongo.Connection()
    
        if start_now:
          self.start()
    
      @staticmethod
      def __get_id(op):
        id = None
        o2 = op.get('o2')
        if o2 is not None:
          id = o2.get('_id')
    
        if id is None:
          id = op['o'].get('_id')
    
        return id
    
      def start(self):
        #oplog = self.connection.local['oplog.$main']
        oplog = self.connection.local['oplog.rs']
        ts = oplog.find().sort('$natural', -1)[0]['ts']
        while True:
          if self._ns_filter is None:
            filter = {}
          else:
            filter = {'ns': self._ns_filter}
          filter['ts'] = {'$gt': ts}
          try:
            #cursor = oplog.find(filter, tailable=True)
            cursor = oplog.find(filter)
            while True:
              for op in cursor:
                ts = op['ts']
                id = self.__get_id(op)
                self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
              time.sleep(self.poll_time)
              if not cursor.alive:
                break
          except AutoReconnect:
            time.sleep(self.poll_time)
    
      def all_with_noop(self, ns, ts, op, id, raw):
        if op == 'n':
          self.noop(ts=ts)
        else:
          self.all(ns=ns, ts=ts, op=op, id=id, raw=raw)
    
      def all(self, ns, ts, op, id, raw):
        if op == 'i':
          self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw)
        elif op == 'u':
          self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw)
        elif op == 'd':
          self.delete(ns=ns, ts=ts, id=id, raw=raw)
        elif op == 'c':
          self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw)
        elif op == 'db':
          self.db_declare(ns=ns, ts=ts, raw=raw)
    
      def noop(self, ts):
        pass
    
      def insert(self, ns, ts, id, obj, raw, **kw):
        print('insert',obj,raw)
        pass
    
      def update(self, ns, ts, id, mod, raw, **kw):
        print('update', mod,raw)
        pass
    
      def delete(self, ns, ts, id, raw, **kw):
        print('delete', id, raw)
        pass
    
      def command(self, ns, ts, cmd, raw, **kw):
        pass
    
      def db_declare(self, ns, ts, **kw):
        pass
    
    class OplogPrinter(OplogWatcher):
      def all(self, **kw):
        pprint (kw)
        print #newline
    
    if __name__ == '__main__':
      conn = pymongo.mongo_client.MongoClient("mongodb://root:tiger@master:27017")
      print(conn)
    
      oplog = conn.local['oplog.rs']
    
      print(oplog)
      ts = oplog.find().sort('$natural', -1)[0]['ts']
    
      print(ts)
    
    
      ow = OplogWatcher(connection=conn,db='cargo',collection='emp')
    
      ow.start();
      # OplogWatcher(connection=conn)
      # OplogPrinter(OplogWatcher)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122

    下面是一个监听mongo oplog实时到kafka的逻辑结构图

    在这里插入图片描述

  • 相关阅读:
    阿里本地生活全域日志平台 Xlog 的思考与实践
    深入解读[面向对象五大设计原则]
    互联网那些技术 | 秒杀库存解决方案
    JS高级(数据类型,数据_变量_内存)
    怎样评测对比报表工具的性能?
    QT QLineEdit显示模式掩码输入验证自动补全示范
    1802_在Linux系统上开发ARM单机片机嵌入式软件
    java中 基础 知识 逻辑控制
    在Ubuntu上配置CUDA基础环境
    操作符前提:各种进制与各种码(计算机基础)
  • 原文地址:https://blog.csdn.net/qq_42575907/article/details/127839578