• 数据迁移一致性测试探索与实践


    背景

    量级庞大的日志通过mysql不足以支撑业务需求,以前通过任务调度定时跑批从mysql同步到hive存储,这种方式时效性为T+1,也就是说今天的日志,明天才能同步到hive,总而言之时效性不高。为了提高时效性,改为流式计算flink实时同步

    • 那么作为测试人员,我们如何保证切换同步方式后的数据正确性呢?通过对比新旧表数据是否一致显然是最简单的方法
    • 这次改动涉及600多张表,每一张表的字段数基本在千以上,甚至部分表字段数达万以上,面对如此庞大的数据量,通过人眼一个个去对比显然不太现实

    探索与实践

    方案一:sql脚本
    SELECT column_names, COUNT(*) AS count_diff 
    FROM (
    	SELECT CONCAT_WS(',',A,B) FROM udc_test.s000 WHERE dt='20230814'
    	UNION ALL 
    	SELECT CONCAT_WS(',',A,B) FROM test.s000 WHERE dt = '20230814' and rule_log_id in (select rule_log_id from udc_test.s000) 
    ) AS combined
    GROUP BY column_names
    HAVING COUNT(column_names) = 1
    
    select * from (
    	select 'table1',A,B from udc_test.s000 WHERE dt='20230814' and rule_log_id in ('123456')
    	union all 
    	select 'table2',A,B from test.s000 WHERE dt='20230814' and rule_log_id in ('123456')
    )a order by a.table1 asc
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    方案二:python脚本
    from pyhive import hive
    from datetime import datetime
    
    if __name__ == '__main__':
        #换成生产的连接
        conn = hive.Connection(host="xxx", port='xxx', auth="xxx", database='xxx', username='xxx',password='xxx')
    
        #这里换成需要比较的表名
        tableName1 = 'test.ssc_python_compare_fields1'
        tableName2 = 'test.ssc_python_compare_fields2'
    
        current_time = datetime.now()
        hash_code = str(hash(current_time))
        # 获取表结构
        query1 = 'desc ' + tableName1
        query2 = 'desc ' + tableName2
        cursor = conn.cursor()
        cursor.execute(query1)
        columns1 = [row[0] for row in cursor.fetchall()]
        cursor.execute(query2)
        columns2 = [row[0] for row in cursor.fetchall()]
    
        # 去除掉不需要比较的字段
        columns1.remove('# Partition Information')
        columns1.remove('# col_name')
        columns1.remove('dt')
    
        columns2.remove('# Partition Information')
        columns2.remove('# col_name')
        columns2.remove('dt')
    
        set1 = set(columns1)
        set2 = set(columns2)
        # 取出来表1特有的字段,可以保存到文件
        diffrence1 = set1 - set2
        print(diffrence1)
        # 取出来表2特有的字段,可以保存到文件
        diffrence2 = set2 - set1
        print(diffrence2)
    
        # 取表1和表2共有的字段,用于比较差异
        intersection = set1 & set2
    
        # 生成比较的sql
        sql = 'select  '
        for element in intersection:
            sql = sql + 'if( nvl(t1.' + element + ',' + hash_code + ' )!= nvl( t2.' + element + ',' + hash_code + ') , \'no\',\'yes\') as ' + element + ' , '
        #print(sql)
        sql = sql[:-2]
        #print(sql)
        #sql中的dt可以改成具体需要比较的日期
        sql = sql + ' from ' + tableName1 + ' as t1 left join  ' + tableName2 \
              + ' as t2  on  t1.rule_log_id=t2.rule_log_id ' \
                ' and t1.dt= \'20230815\' and t2.dt = \'20230815\'  and t1.apply_type=t2.apply_type   where  '
        for element in intersection:
            sql = sql + ' t1.' + element + '!=t2.' + element + ' or '
    
        sql = sql[:-3]
        print(sql)
    
        sql = sql + ' limit 1 '
    
        # 执行sql,获取到结果,如果两列不相等的话,值为no,相等的话值为yes
        cursor.execute(sql)
        result = cursor.fetchone()
        # print(result)
    
        # 获取上述sql的元数据信息
        metadatas = cursor.description
        print('============================================================')
        # 遍历结果集,查找出比较结果不相同的数据,拿到列名
        index = 0
        while index < len(metadatas):
            if (result[index] != 'yes'):
                print(metadatas[index][0])
            index += 1
        print('============================================================')
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
  • 相关阅读:
    【Leetcode每日一题】 递归 - 反转链表(难度⭐)(35)
    【微前端开发环境下,加载远程子应用的实战。】
    虹科 | 解决方案 | 新能源车EV测试解决方案
    【CNN-GRU预测】基于卷积神经网络-门控循环单元的单维时间序列预测研究(Matlab代码实现)
    【图像配准】基于SURF特征实现印刷体汉字配准附matlab代码
    SpringApplication对应可配置属性作用
    docker搭建Elasticsearch、Kibana、Logstash 同步mysql数据到ES
    最新AIGC创作系统ChatGPT网站源码,Midjourney绘画系统,支持最新GPT-4-Turbo模型,支持DALL-E3文生图
    数学建模之Matlab基础操作
    【ArcGIS Engine开发入门】1.简单显示地图LicenseContro,ToolbarControl,TOCControl,MapControl
  • 原文地址:https://blog.csdn.net/qq_39537481/article/details/133950488