• DataFrame,数据列筛选代替遍历每一行数据去判断,大大提高数据过滤速度


    运行环境

    笔记本win10,i5-6200U,16G
    python3.8
    pandas1.3.5

    程序目的

    在一张接近45M的csv表格,63.7w行数据,从中提取符合时间段范围的(每天20到第二天9点的数据),并且信号数值小于一定范围的数据,接着按每天为一份,去重统计和提取这些数据的原始详细信息
    在这里插入图片描述

    最原始的运算方式,一行一行遍历去判断每个单元格

    def date_clean(file: str, time_interval: tuple, distance: tuple):
        """
        按时间区间,信号范围来提取出数据
        :param file: 读取csv文件
        :param time_interval: 开始小时和结束小时,元组类型(20,9)20点到9点
        :param distance: 开始结束信号,元组类型(500,30000) 500到30000
        :return:
        """
        df_list = []  # 存储筛选好的数据,先存列表或字典,最后再转为DataFrame能缩短一半时间
        print(f"loading... {datetime.datetime.now()}")
        data_frame = pd.read_csv(file, encoding='utf-8', dtype="string[pyarrow]")  # 防止读取长数值时,用的是科学计数法导致精度丢失
        print(f"加载完毕... {datetime.datetime.now()}")
        start_time = datetime.time(time_interval[0], 0, 0)
        end_time = datetime.time(time_interval[1], 0, 0)
        if time_interval[0] > time_interval[1]:  # 如果时间区间是跨天
    
            for ind, row in data_frame.iterrows():
                if (datetime.time(time_interval[0], 0, 0) <= pd.to_datetime(row["采集时间"]).time() or
                        pd.to_datetime(row["采集时间"]).time() <= datetime.time(time_interval[1], 0, 0)):  # 只比较小时:
                    if distance[0] <= int(float(row["信号"])) <= distance[1]:
                        #print(data_frame.loc[[row]])
                        #print(row)
                        df_list.append(row.to_list())
            
        else:
            pass
        df = data_frame.reset_index(drop=True)
    
        #df = pd.DataFrame(df_list, columns=["数据", "地点", "采集时间", "归属地", "信号"])  # to_list()后没有了表头,需要手动添加
       # .reset_index()列表里存的是Series,他还有一组与数组数据对应的标签索引。转为DataFrame后需要重设index
    
        #print(df)
        print(f"依据时间和信号值过滤的数据比为 {len(df)}/{len(data_frame)}")
        print(f"数据清理... {datetime.datetime.now()}")
    
        subsection_count_filt(df, time_interval, 28)
    
    
    def subsection_count_filt(full_data: pd.DataFrame, time_interval: tuple, times: int):
        """
        按时间段划分为一段数据区间,在从去重后的唯一值列表去筛选出这段区间内出现的次数
        :param unique_data: 去重后的唯一值列表
        :param full_data: 全量数据
        :param time_interval: 时间范围区间,(开始时间,结束时间)
        :param times: 筛选出现的次数 大于等于
        :return:
        """
        full_result_list = []  # 保存统计次数的结果
        full_result_details = pd.DataFrame()   # 保存结果的详情内容
        dt = full_data["采集时间"].str.split(' ', expand=True)  # 将年月日 时分秒拆分成两列
        # print(dt)
        # full_data["年月日"] = dt[0]
        # full_data["时分秒"] = dt[1]
        day_times = dt[0].drop_duplicates().to_list()  # 获取表格中包含的日期
        day_times.sort()
    
        for day in day_times:  # 按时间归为一组
            print(day)
            day_time1 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[0], 0, 0)
            day_time2 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[1], 0, 0) + datetime.timedelta(days=1)
            df_day_list = []  # 分类一天的数据
            to_drop = []
            have_data = False  # 判断插入了数据
            #for row in range(0, len(full_data)):
            for index, row in full_data.iterrows():
                if day_time1 <= pd.to_datetime(row["采集时间"]) <= day_time2:
                    # 一天内的数据汇总统计
                    df_day_list.append(row.to_list())
                    to_drop.append(index)
                    have_data = True
            # 删除已不需要的后能加快速度下次遍历速度
            full_data = full_data.drop(to_drop)  # 快了1/3
    
            if have_data:
                df_day = pd.DataFrame(df_day_list, columns=["数据", "地点", "采集时间", "归属地", "信号"])
                one_day_counts = df_day["数据"].value_counts()  # 计数
                one_day_result = one_day_counts[one_day_counts >= times]  # 筛选出现次数大于n次的
                result = pd.DataFrame({'数据': one_day_result.index, '次数': one_day_result.values})
                # print(result)
                for ind, row in result.iterrows():
    
                    full_result_list.append({"日期": day, "数据": row["数据"], "次数": row["次数"]})
                    full_result_details = pd.concat([full_result_details, (df_day[df_day["数据"] == row["数据"]])], ignore_index=True)  # 从一天数据中检索并插入详情,concat拼接
                # print(full_result)
    
        full_result = pd.DataFrame(full_result_list)
    
        # 结果去重
        print("结果去重")
        print(full_result.drop_duplicates(subset="数据").reset_index())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    运行时间 900s+,时间太长了,查资料说用pandas2.0之后的,可以用pyarrow来提高速率,但是2.0很多方法不适用了,影响到之前的一些脚本运行,后来仔细想了想,是遍历那里的时间复杂度太大了,提取出结果的数据,大概要遍历10亿次,决定从这里的循环逻辑做改善

    取消遍历的方式,用DataFrame本身的数据筛选

    之前一直难到的点是,觉得时间这一列要提取出来转换成time后才能作比较,现在找到直接转换成时间而不保留日期的函数了

    def date_clean(file: str, time_interval: tuple, distance: tuple):
        """
        按时间区间,信号范围来提取出数据
        :param file: 读取csv文件
        :param time_interval: 开始小时和结束小时,元组类型(20,9)20点到9点
        :param distance: 开始结束信号,元组类型(500,30000) 500到30000
        :return:
        """
        df_list = []  # 存储筛选好的数据,先存列表或字典,最后再转为DataFrame能缩短一半时间
        print(f"loading... {datetime.datetime.now()}")
        data_frame = pd.read_csv(file, encoding='utf-8', dtype="string[pyarrow]")  # 防止读取长数值时,用的是科学计数法导致精度丢失
        print(f"加载完毕... {datetime.datetime.now()}")
        start_time = datetime.time(time_interval[0], 0, 0)
        end_time = datetime.time(time_interval[1], 0, 0)
        if time_interval[0] > time_interval[1]:  # 如果时间区间是跨天
            #  !!! 取消用一行行遍历的方式,来减少时间复杂度,直接根据列值来筛选后使用
            data_frame["小时间"] = pd.to_datetime(data_frame["采集时间"]).dt.time  # 只提取小时
            data_frame["信号"] = pd.to_numeric(data_frame["信号"])  # 转换数值行
            data_frame = data_frame.loc[(data_frame["小时间"] >= start_time) | (data_frame["小时间"] <= end_time)]  # 跨天时间用 或
            data_frame = data_frame.loc[(data_frame["信号"] >= float(distance[0])) & (data_frame["信号"] <= float(distance[1]))]
            data_frame.drop(['小时间'], axis=1, inplace=True)  # 删除小时
    
        else:
            pass
        df = data_frame.reset_index(drop=True)
        #print(df)
        print(f"依据时间和信号值过滤的数据比为 {len(df)}/{len(data_frame)}")
        print(f"数据清理... {datetime.datetime.now()}")
    
        # print(f"再次去重后 {len(df_unique)}/{find_num}")
    
        subsection_count_filt(df, time_interval, 28)
    
    
    def subsection_count_filt(full_data: pd.DataFrame, time_interval: tuple, times: int):
        """
        按时间段划分为一段数据区间,在从去重后的唯一值列表去筛选出这段区间内出现的次数
        :param unique_data: 去重后的唯一值列表
        :param full_data: 全量数据
        :param time_interval: 时间范围区间,(开始时间,结束时间)
        :param times: 筛选出现的次数 大于等于
        :return:
        """
        full_result_list = []  # 保存统计次数的结果
        full_result_details = pd.DataFrame()   # 保存结果的详情内容
        dt = full_data["采集时间"].str.split(' ', expand=True)  # 将年月日 时分秒拆分成两列
    
        day_times = dt[0].drop_duplicates().to_list()  # 获取表格中包含的日期
        day_times.sort()
    
        full_data["采集时间"] = pd.to_datetime(full_data["采集时间"])  # 转换为时间格式
    
        for day in day_times:  # 按时间归为一组
            print(day)
            #  划分一组的开始时间和结束时间
            day_time1 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[0], 0, 0)
            day_time2 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[1], 0, 0) + datetime.timedelta(days=1)
    
            #  取消使用上面的遍历模式
            df_day = full_data.loc[(full_data["采集时间"] >= day_time1) & (full_data["采集时间"] <= day_time2)]  # 具体日期时间用 与
    
            if len(df_day) != 0:  # 有数据
    
                df_day = df_day.reset_index(drop=True)
                one_day_counts = df_day["数据"].value_counts()  # 计数
                one_day_result = one_day_counts[one_day_counts >= times]  # 筛选出现次数大于n次的
                result = pd.DataFrame({'数据': one_day_result.index, '次数': one_day_result.values})
                # print(result)
                for ind, row in result.iterrows():
    
                    full_result_list.append({"日期": day, "数据": row["数据"], "次数": row["次数"]})
                    full_result_details = pd.concat([full_result_details, (df_day[df_day["数据"] == row["数据"]])], ignore_index=True)  # 从一天数据中检索并插入详情,concat拼接
                # print(full_result)
    
        full_result = pd.DataFrame(full_result_list)
    
        # 结果去重
        print("结果去重")
        print(full_result.drop_duplicates(subset="数据").reset_index())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    经过这种方法后,运算时间在30s+,快了30倍啊!!!,所以有时候运行慢,找到不是系统io读写文件这种问题后,考虑一下程序的时间复杂度

  • 相关阅读:
    Flask框架——Flask-Mail邮件
    扩散模型 - Diffusion Model【李宏毅2023】学习笔记 数学原理篇
    二本4年Java经验,五面阿里艰苦经历(定薪45K),回馈一波心得体会
    Spring Boot 使用自定义注解和自定义线程池实现异步日志记录
    [附源码]JAVA毕业设计基于Ssm学生信息管理系统(系统+LW)
    EBS R12.2.0升级到R12.2.6
    HashMap详解
    【华为OD机考B卷 | 100分】整数编码(JAVA题解——也许是全网最详)
    2022.8前端面试准备题
    QCC51XX-QCC30XX系列开发教程(实战篇) 之 12.5-空间音频用开发板调试配置说明
  • 原文地址:https://blog.csdn.net/PengDW12/article/details/134055898