• TDengine数据迁移之数据对比


    数据完整性和一致性校验是迁移数据后的必要步骤,TDengine 数据迁移也是如此。但通常TDengine存储的都是海量数据,动辄几百亿条数据,如果像手工对比差异,工作量是非常巨大的。

    以下脚本实现了对两个数据库记录数的对比。主要实现方式为:

    1. 读取超级表信息
    2. 读取时间段信息
    3. 通过 select count(*) from 超级表 group by tbname where ts>='' and ts<''; 查询子表记录数
    4. 对比源和目标库的记录数是否一致
    5. 输出对比结果。

    为保证兼容2.x和3.x,数据库查询采用 Restful 方式。

    脚本使用方法如下:

    1. 将要对比的超级表名称放入相同目录的stblist文件中(必须是同一个数据库)。
    2. 配置源和目标数据库信息(用户名、密码、URL、数据库名称)
    3. 运行脚本 python3 datacompare.py 2023-01-01T00:00:00Z 2023-10-01T00:00:00Z

    注意:

    1. 时间格式必须是 ISO8601 格式
    2. 如果没有指定时间段,则默认为2000-01-01T00:00:00.000+00:002023-10-01T00:00:00.000+00:00
    import requests
    import sys
    import datetime
    import json
    from requests.auth import HTTPBasicAuth
    import configparser
    
    
    def arg_j(sarg):
        """Parse time string in ISO8601 format to timestamp."""
        try:
            dt = datetime.datetime.fromisoformat(sarg).strftime('%s')
            return dt
        except ValueError:
            sys.exit(f"{sarg}. Time only support ISO8601 format!")
    
    
    def request_post(url, sql, user, pwd):
        """Post request to specific url."""
        try:
            sql = sql.encode("utf-8")
            headers = {
                'Connection': 'keep-alive',
                'Accept-Encoding': 'gzip, deflate, br',
            }
            result = requests.post(url, data=sql, auth=HTTPBasicAuth(user,pwd),headers=headers)
            text = result.content.decode()
            return text
        except Exception as e:
            print(e)
    
    
    def check_return(result, tdversion):
        """Check result of request."""
        if tdversion == 2:
            datart = json.loads(result).get("status")
        else:
            datart = json.loads(result).get("code")
            
        if str(datart) == 'succ' or str(datart) == '0':
            chkrt = 'succ'
        else:
            chkrt = 'error'
        return chkrt
    
    
    def get_data(stbname, url, username, password, dbname, version, stime, etime):
        """Get data from source database or destination database."""
        data = dict()
        if version == 2:
            sql = f"select count(*) from `{dbname}`.`{stbname}` where _c0>='{stime}' and _c0<='{etime}' group by tbname;"
        else:
            sql = f"select count(*),tbname from `{dbname}`.`{stbname}` where _c0>='{stime}' and _c0<='{etime}' group by tbname;"
        
        rt = request_post(url, sql, username, password)
        code = check_return(rt, version)
        
        if code != 'error':
            rdata = json.loads(rt).get("data")
            for ll in range(len(rdata)):
                data[rdata[ll][1]] = rdata[ll][0]
        else:
            print(rt)
        return data
    
    
    def compare_data(source_info, destination_info, stime, etime):
        """Compare data between source database and destination database."""
        tb_lost = set()
        tb_diff = set()
    
        with open('stblist', 'r') as sfile:
            for stbname in sfile:
                stbname = stbname.strip()
                
                source_data = get_data(stbname, **source_info, stime=stime, etime=etime)
                destination_data = get_data(stbname, **destination_info, stime=stime, etime=etime)
            
                for key, source_value in source_data.items():
                    destination_value = destination_data.get(key)
    
                    if destination_value is None:
                        tb_lost.add(key)
                        print(f'Table {key} not exist in destination DB {destination_info["dbname"]}')
                    elif destination_value != source_value:
                        tb_diff.add(key)
                        print(f'Table {key} has different values between source and destination, source is {source_value}, destination is {destination_value}.')
                        
        print("Lost tables: {}, Diff tables: {}.".format(len(tb_lost), len(tb_diff)))
    
    
    def main():
        config = configparser.ConfigParser()
        config.read('config.ini')
    
        source_info = {
            'url': config['source']['url'],
            'username': config['source']['username'],
            'password': config['source']['password'],
            'dbname': config['source']['dbname'],
            'version': int(config['source']['version']),
        }
    
        destination_info = {
            'url': config['destination']['url'],
            'username': config['destination']['username'],
            'password': config['destination']['password'],
            'dbname': config['destination']['dbname'],
            'version': int(config['destination']['version']),
        }
    
        if len(sys.argv) >= 3:
            stime = str(sys.argv[1])
            etime = str(sys.argv[2])
        else:
            stime = '2000-01-01T00:00:00.000+00:00'
            etime = '2023-10-01T00:00:00.000+00:00'
        arg_j(stime)
        arg_j(etime)
    
        compare_data(source_info, destination_info, stime, etime)
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    以上代码是 AI 修改过的,不保证能够执行成功。

    我将调试好的代码也上传了。点击下载

  • 相关阅读:
    亚马逊卖家自己掌握测评养号技术的重要性
    终于还是熬不住了,转行了,分享一波刚学到的知识吧,字符串的自带函数.py
    正点原子stm32F407学习笔记6——外部中断实验
    visual studio下载安装
    【Java】如何判断一个空对象
    Promise 解决 Vue 中父子组件的加载问题!
    三剑客之 awk
    甲骨文、SUSE 和 CIQ (Rocky Linux )提供Open Enterprise Linux Association (OpenELA)
    微信小程序(五)--- Vant组件库,API Promise化,MboX全局数据共享,分包相关
    Android 开发学习(四)
  • 原文地址:https://blog.csdn.net/weixin_43700866/article/details/134338935