• 用Python脚本迁移MongoDB数据到金仓-kingbase数据库


    1、首先需要明确MongoDB与kingbase的对应关系,collection相当于table,filed相当于字段,根据这个对应关系创建表;

    此次迁移的MongoDB里的数据字段是:_id(自动生成的objectid),image(转成二进制存储的文档)

    所以在金仓里创建表 create table admin(id varchar,image bytea);

    2、安装Python环境,由于是内网环境,没有yum源,需要从能连接互联网的环境下载好相应的安装包

    Python:3.9.0版本

    用到以下这些包

    import pymongo
    import ksycopg2
    import concurrent.futures
    from ksycopg2 import pool
    import logging
    from urllib.parse import quote_plus

    ------------------------------------------------------------------------------------

    pip download pymongo -d pymongo_packages --下载pymongo库

    pip3 install --no-index --find-links=. pymongo --安装pymongo库

    金仓的Python驱动可以到金仓官网下载,需要找和Python对应的版本

    以下是Python脚本内容:

    复制代码
    import pymongo
    import psycopg2
    import concurrent.futures
    from psycopg2 import pool
    import logging
    from urllib.parse import quote_plus
    import os
    
    # 初始化日志记录
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    
    # MongoDB设置
    username='admin'
    password='SCJGscjg@123'
    host='10.253.228.41'
    port='27017'
    encoded_username = quote_plus(username)
    encoded_password = quote_plus(password)
    uri = f"mongodb://{encoded_username}:{encoded_password}@{host}:{port}/"
    mongo_client = pymongo.MongoClient(uri)
    mongo_db = mongo_client['admin']
    mongo_collection = mongo_db['admin']
    
    # 连接池设置
    kb_pool = psycopg2.pool.ThreadedConnectionPool(
        minconn=1,
        maxconn=20,
        host="10.253.228.110",
        database="mongo",
        user="system",
        password="1",
        port="54322"
    )
    
    # 偏移量存储文件
    OFFSET_FILE = 'offset.txt'
    
    def read_offset():
        if os.path.exists(OFFSET_FILE):
            with open(OFFSET_FILE, 'r') as f:
                return int(f.read().strip())
        return 0
    
    def write_offset(offset):
        with open(OFFSET_FILE, 'w') as f:
            f.write(str(offset))
    
    def batch_insert(mongo_data):
        kb_conn = None
        try:
            kb_conn = kb_pool.getconn()
            with kb_conn.cursor() as kb_cursor:
                for data in mongo_data:
                    id_value = data['_id']
                    image_data = data['image']
                    insert_query = "INSERT INTO dzzzwj(id, image) VALUES (%s, %s)"
                    kb_cursor.execute(insert_query, (id_value, image_data))
                kb_conn.commit()
            return True
        except Exception as e:
            logging.error(f"批量插入错误: {e}")
            return False
        finally:
            if kb_conn:
                kb_pool.putconn(kb_conn)
    
    def main():
        batch_size = 80
        offset = read_offset()
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
        
        try:
            while True:
                mongo_data = list(mongo_collection.find().skip(offset).limit(batch_size))
                if not mongo_data:
                    break
    
                future = executor.submit(batch_insert, mongo_data)
                future.add_done_callback(lambda f, offset=offset: (
                    logging.info(f"Batch completed with offset {offset}") if f.result() else logging.error(f"Batch failed with offset {offset}"),
                    write_offset(offset + batch_size) if f.result() else None
                ))
                offset += batch_size if future.result() else 0
        except Exception as e:
            logging.error(f"主循环错误: {e}")
        finally:
            executor.shutdown(wait=True)
            mongo_client.close()
            kb_pool.closeall()
            logging.info("资源已清理完毕。")
    
    if __name__ == "__main__":
        main()
    复制代码

    这段代码思路:

    (1)连接MongoDB和kingbase数据;

    (2)因为MongoDB数据量比较大,并且需要断点续传,索引用了分页和排序;

    (3)数据成功插入金仓数据库后,增加偏移量,并且将当前偏移量记录在offset.txt里面,以便脚本停了,可以再重启接着迁数据;

    因为二进制数据从MongoDB和金仓数据查询出来的内容看着不一样,所以下面的代码是计算两边数据md5值对比的简单代码

    复制代码
    import pymongo
    import ksycopg2
    import base64
    import hashlib
    
    def compute_hash(data):
        return hashlib.md5(data).hexdigest()
    
    mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
    mongo_db = mongo_client['admin']
    mongo_collection = mongo_db['mongodb']
    
    database = "test"
    user = "system"
    password = "1"
    host = "127.0.0.1"
    port = "54322"
    
    conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)
    
    cursor = conn.cursor()
    
    
    mongo_data = mongo_collection.find()
    print(mongo_data)    
    
        # 插入到 kingbase
    for data in mongo_data:
       id_value = data['_id']
       image_data = data['image']
    
       #image_data = base64.b64encode(base64_data).decode('utf-8')
    
       image_data_byte = image_data 
       if isinstance(image_data, bytes):
           mongo_hash = compute_hash(image_data_byte)
           print(mongo_hash)
    
       #image_data = base64.b64encode(base64_data).decode('utf-8')
       if id_value and image_data:
          insert_query = "INSERT INTO zzwj(_id, image) VALUES (%s, %s)"
          cursor.execute(insert_query, (id_value, image_data))
    
        # 提交事务
    conn.commit()
    
    cursor.execute("select _id, image from zzwj")
    rows = cursor.fetchall()
    
    for row in rows:
        _id = row[0]
        image_byte = row[1]
        
        pg_hash = compute_hash(image_byte)
        print(pg_hash)
    
    
    # 关闭连接
    cursor.close()
    conn.close()
    mongo_client.close()
    复制代码

     

  • 相关阅读:
    python链表_递归求和_递归求最大小值
    NKOJP5682果老师炸桥
    数据库管理-第115期 too many open files(202301107)
    助力工业物联网,工业大数据之客户回访事实指标需求分析【二十三】
    Redis
    linux进程间通讯--信号量
    架构设计|基于 raft-listener 实现实时同步的主备集群
    微信小程序简介
    浅谈基于物联网的医院消防安全管理
    java-php-net-python-图书馆选择计算机毕业设计程序
  • 原文地址:https://www.cnblogs.com/weisu-koko/p/18225227