笔记本win10,i5-6200U,16G
python3.8
pandas1.3.5
在一张接近45M的csv表格,63.7w行数据,从中提取符合时间段范围的(每天20到第二天9点的数据),并且信号数值小于一定范围的数据,接着按每天为一份,去重统计和提取这些数据的原始详细信息
def date_clean(file: str, time_interval: tuple, distance: tuple):
"""
按时间区间,信号范围来提取出数据
:param file: 读取csv文件
:param time_interval: 开始小时和结束小时,元组类型(20,9)20点到9点
:param distance: 开始结束信号,元组类型(500,30000) 500到30000
:return:
"""
df_list = [] # 存储筛选好的数据,先存列表或字典,最后再转为DataFrame能缩短一半时间
print(f"loading... {datetime.datetime.now()}")
data_frame = pd.read_csv(file, encoding='utf-8', dtype="string[pyarrow]") # 防止读取长数值时,用的是科学计数法导致精度丢失
print(f"加载完毕... {datetime.datetime.now()}")
start_time = datetime.time(time_interval[0], 0, 0)
end_time = datetime.time(time_interval[1], 0, 0)
if time_interval[0] > time_interval[1]: # 如果时间区间是跨天
for ind, row in data_frame.iterrows():
if (datetime.time(time_interval[0], 0, 0) <= pd.to_datetime(row["采集时间"]).time() or
pd.to_datetime(row["采集时间"]).time() <= datetime.time(time_interval[1], 0, 0)): # 只比较小时:
if distance[0] <= int(float(row["信号"])) <= distance[1]:
#print(data_frame.loc[[row]])
#print(row)
df_list.append(row.to_list())
else:
pass
df = data_frame.reset_index(drop=True)
#df = pd.DataFrame(df_list, columns=["数据", "地点", "采集时间", "归属地", "信号"]) # to_list()后没有了表头,需要手动添加
# .reset_index()列表里存的是Series,他还有一组与数组数据对应的标签索引。转为DataFrame后需要重设index
#print(df)
print(f"依据时间和信号值过滤的数据比为 {len(df)}/{len(data_frame)}")
print(f"数据清理... {datetime.datetime.now()}")
subsection_count_filt(df, time_interval, 28)
def subsection_count_filt(full_data: pd.DataFrame, time_interval: tuple, times: int):
"""
按时间段划分为一段数据区间,在从去重后的唯一值列表去筛选出这段区间内出现的次数
:param unique_data: 去重后的唯一值列表
:param full_data: 全量数据
:param time_interval: 时间范围区间,(开始时间,结束时间)
:param times: 筛选出现的次数 大于等于
:return:
"""
full_result_list = [] # 保存统计次数的结果
full_result_details = pd.DataFrame() # 保存结果的详情内容
dt = full_data["采集时间"].str.split(' ', expand=True) # 将年月日 时分秒拆分成两列
# print(dt)
# full_data["年月日"] = dt[0]
# full_data["时分秒"] = dt[1]
day_times = dt[0].drop_duplicates().to_list() # 获取表格中包含的日期
day_times.sort()
for day in day_times: # 按时间归为一组
print(day)
day_time1 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[0], 0, 0)
day_time2 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[1], 0, 0) + datetime.timedelta(days=1)
df_day_list = [] # 分类一天的数据
to_drop = []
have_data = False # 判断插入了数据
#for row in range(0, len(full_data)):
for index, row in full_data.iterrows():
if day_time1 <= pd.to_datetime(row["采集时间"]) <= day_time2:
# 一天内的数据汇总统计
df_day_list.append(row.to_list())
to_drop.append(index)
have_data = True
# 删除已不需要的后能加快速度下次遍历速度
full_data = full_data.drop(to_drop) # 快了1/3
if have_data:
df_day = pd.DataFrame(df_day_list, columns=["数据", "地点", "采集时间", "归属地", "信号"])
one_day_counts = df_day["数据"].value_counts() # 计数
one_day_result = one_day_counts[one_day_counts >= times] # 筛选出现次数大于n次的
result = pd.DataFrame({'数据': one_day_result.index, '次数': one_day_result.values})
# print(result)
for ind, row in result.iterrows():
full_result_list.append({"日期": day, "数据": row["数据"], "次数": row["次数"]})
full_result_details = pd.concat([full_result_details, (df_day[df_day["数据"] == row["数据"]])], ignore_index=True) # 从一天数据中检索并插入详情,concat拼接
# print(full_result)
full_result = pd.DataFrame(full_result_list)
# 结果去重
print("结果去重")
print(full_result.drop_duplicates(subset="数据").reset_index())
运行时间 900s+,时间太长了,查资料说用pandas2.0之后的,可以用pyarrow来提高速率,但是2.0很多方法不适用了,影响到之前的一些脚本运行,后来仔细想了想,是遍历那里的时间复杂度太大了,提取出结果的数据,大概要遍历10亿次,决定从这里的循环逻辑做改善
之前一直难到的点是,觉得时间这一列要提取出来转换成time后才能作比较,现在找到直接转换成时间而不保留日期的函数了
def date_clean(file: str, time_interval: tuple, distance: tuple):
"""
按时间区间,信号范围来提取出数据
:param file: 读取csv文件
:param time_interval: 开始小时和结束小时,元组类型(20,9)20点到9点
:param distance: 开始结束信号,元组类型(500,30000) 500到30000
:return:
"""
df_list = [] # 存储筛选好的数据,先存列表或字典,最后再转为DataFrame能缩短一半时间
print(f"loading... {datetime.datetime.now()}")
data_frame = pd.read_csv(file, encoding='utf-8', dtype="string[pyarrow]") # 防止读取长数值时,用的是科学计数法导致精度丢失
print(f"加载完毕... {datetime.datetime.now()}")
start_time = datetime.time(time_interval[0], 0, 0)
end_time = datetime.time(time_interval[1], 0, 0)
if time_interval[0] > time_interval[1]: # 如果时间区间是跨天
# !!! 取消用一行行遍历的方式,来减少时间复杂度,直接根据列值来筛选后使用
data_frame["小时间"] = pd.to_datetime(data_frame["采集时间"]).dt.time # 只提取小时
data_frame["信号"] = pd.to_numeric(data_frame["信号"]) # 转换数值行
data_frame = data_frame.loc[(data_frame["小时间"] >= start_time) | (data_frame["小时间"] <= end_time)] # 跨天时间用 或
data_frame = data_frame.loc[(data_frame["信号"] >= float(distance[0])) & (data_frame["信号"] <= float(distance[1]))]
data_frame.drop(['小时间'], axis=1, inplace=True) # 删除小时
else:
pass
df = data_frame.reset_index(drop=True)
#print(df)
print(f"依据时间和信号值过滤的数据比为 {len(df)}/{len(data_frame)}")
print(f"数据清理... {datetime.datetime.now()}")
# print(f"再次去重后 {len(df_unique)}/{find_num}")
subsection_count_filt(df, time_interval, 28)
def subsection_count_filt(full_data: pd.DataFrame, time_interval: tuple, times: int):
"""
按时间段划分为一段数据区间,在从去重后的唯一值列表去筛选出这段区间内出现的次数
:param unique_data: 去重后的唯一值列表
:param full_data: 全量数据
:param time_interval: 时间范围区间,(开始时间,结束时间)
:param times: 筛选出现的次数 大于等于
:return:
"""
full_result_list = [] # 保存统计次数的结果
full_result_details = pd.DataFrame() # 保存结果的详情内容
dt = full_data["采集时间"].str.split(' ', expand=True) # 将年月日 时分秒拆分成两列
day_times = dt[0].drop_duplicates().to_list() # 获取表格中包含的日期
day_times.sort()
full_data["采集时间"] = pd.to_datetime(full_data["采集时间"]) # 转换为时间格式
for day in day_times: # 按时间归为一组
print(day)
# 划分一组的开始时间和结束时间
day_time1 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[0], 0, 0)
day_time2 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[1], 0, 0) + datetime.timedelta(days=1)
# 取消使用上面的遍历模式
df_day = full_data.loc[(full_data["采集时间"] >= day_time1) & (full_data["采集时间"] <= day_time2)] # 具体日期时间用 与
if len(df_day) != 0: # 有数据
df_day = df_day.reset_index(drop=True)
one_day_counts = df_day["数据"].value_counts() # 计数
one_day_result = one_day_counts[one_day_counts >= times] # 筛选出现次数大于n次的
result = pd.DataFrame({'数据': one_day_result.index, '次数': one_day_result.values})
# print(result)
for ind, row in result.iterrows():
full_result_list.append({"日期": day, "数据": row["数据"], "次数": row["次数"]})
full_result_details = pd.concat([full_result_details, (df_day[df_day["数据"] == row["数据"]])], ignore_index=True) # 从一天数据中检索并插入详情,concat拼接
# print(full_result)
full_result = pd.DataFrame(full_result_list)
# 结果去重
print("结果去重")
print(full_result.drop_duplicates(subset="数据").reset_index())
经过这种方法后,运算时间在30s+,快了30倍啊!!!,所以有时候运行慢,找到不是系统io读写文件这种问题后,考虑一下程序的时间复杂度