从配置文件读取配置参数,并每天定时执行日常任务、每周一定时执行周任务,匹配符合要求的GGA数据,文件查找-复制-合并-裁剪。
定时任务运行的日志:

- import os
- import re
- import shutil
- import threading
- import datetime
- import time
-
- import schedule
-
- # 全局变量:配置文件
- glob_config = "config_nrtk.txt"
-
-
- def txt_to_dict(filename):
- with open(filename, 'r', encoding='UTF-8-sig') as f: # 读取的文件或者写入文件时会出现”\ufeff”非法字符,需要改变编码方式‘UTF-8‘为‘UTF-8-sig‘
- data_dict = {}
- for line in f:
- # 忽略空行和注释行
- if line.strip() == '' or line.startswith('#'):
- continue
- key, value = line.strip().split('=')
- data_dict[key] = value
- return data_dict
-
-
- def date_to_th(date_t):
- # 获取今年的第一天
- first_day = datetime.date(date_t.year, 1, 1)
- # 计算当前天是今年的第几天
- day_of_year = (date_t - first_day).days + 1
- # 获取当前天的月份
- month = date_t.month
- # 将月份转换为英文,保留3个字符
- month_name = ["Unknown", "January", "February", "March", "April", "May", "June",
- "July", "August", "September", "October", "November", "December"][month][:3]
- print("当前天是{}月。".format(month_name))
- print("当前天是今年的第{}天。".format(day_of_year))
-
- return day_of_year, month_name
-
-
- def copy_exchange(src_path, dst_path, dt, pattern_start="120000.00", pattern_end="235959.00"):
- # 文件命名
- now = datetime.datetime.now()
- fn = re.sub(r'-', '', dt)
- pp = pattern_start.split(".")
- fnn = pp[0]
-
- # 区分不同环境的pos文件
- if "NRTK_PROD" in src_path:
- print("PROD环境的NRTK文件")
- file_name = "NRTK_PROD_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.org'
- # elif "PROD_V2" in src_path:
- # print("PROD_V2环境的SSR文件")
- # file_name = "PROD_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.pos'
- # elif "TEST_V2_P" in src_path:
- # print("TEST_V2_P环境的SSR文件")
- # file_name = "TEST_P_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.pos'
- # elif "TEST_V2" in src_path:
- # print("TEST_V2环境的SSR文件")
- # file_name = "TEST_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.pos'
- else:
- print("非PROD和TEST环境的SSR文件")
- file_name = fn + '_' + fnn + now.strftime('_%H%M%S') + '.org'
-
- # 复制文件
- dst_path_c = os.path.join(dst_path, file_name)
- shutil.copy2(src_path, dst_path_c)
-
- # 写入目标文件
- dst_w_path = os.path.join(dst_path, 'N_' + file_name)
- with open(dst_path_c, 'r') as dst, open(dst_w_path, 'w') as dst_w:
- # 默认不写入文件
- writing = False
- num_count = 0
- for line in dst:
- # 正则匹配,起始行
- if re.search(pattern_start, line) and line.startswith('$GPZDA'):
- writing = True
- if writing:
- num_count += 1
- dst_w.write(line)
- # 正则匹配,截止行
- if re.search(pattern_end, line): # and line.startswith('$GPGGA'):
- writing = False
- # break
- if num_count == 0:
- print(r'当前时间:%s,没有数据匹配' % (now.strftime('%Y-%m-%d %H:%M:%S')))
- else:
- print(r'当前时间:%s,总共%s行写入完成,起始于:%s,截止于:%s' % (
- now.strftime('%Y-%m-%d %H:%M:%S'), num_count, pattern_start, pattern_end))
-
- if dst_path_c:
- os.remove(dst_path_c)
-
-
- def nrtk_daily_job():
- # 获取今天的日期
- today = datetime.date.today()
- # 获取昨天的日期
- yesterday = today - datetime.timedelta(days=1)
- # print("昨天的日期是:", yesterday)
- day_y, month_y = date_to_th(yesterday)
-
- # 读取配置文件,加载字典
- data = txt_to_dict(glob_config)
-
- # 获取特定键值对的值
- origin_dir = data.get('origin_dir', 'null')
- backup_dir = data.get('backup_dir', 'null')
- file_prefix_prod_nrtk = data.get('file_prefix_prod_nrtk', 'null')
-
- # 拼接完整的org文件名称
- file_name = file_prefix_prod_nrtk + str(day_y * 10) + ".org"
-
- # 拼接读取的文件路径
- origin_dir_all = os.path.join(origin_dir, month_y, file_name)
-
- # 拼接昨天的起始和截止关键字
- day_y_format = time_exchange(str(yesterday))
- pattern_start_y = "120000.00," + str(day_y_format)
- day_y_format = time_exchange(str(yesterday))
- pattern_end_y = "235959.00," + str(day_y_format)
-
- if origin_dir_all:
- copy_exchange(origin_dir_all, backup_dir, str(yesterday), pattern_start_y, pattern_end_y)
- else:
- print(r'当前时间:%s,%s文件不存在' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), origin_dir_all))
-
-
- def nrtk_weekly_job():
- # 获取今天的日期
- today = datetime.date.today()
- # 获取今天的偏移量
- idx = (today.weekday() + 1) % 7 # MON = 0, SUN = 6 -> SUN = 0 .. SAT = 6
- # 获取上周五、六、日的日期
- fri = today - datetime.timedelta(idx + 2)
- sun = today - datetime.timedelta(idx + 1)
- sat = today - datetime.timedelta(idx)
- # print(fri, sun, sat)
- day_fri, month_fri = date_to_th(fri)
- day_sun, month_sun = date_to_th(sun)
- day_sat, month_sat = date_to_th(sat)
-
- # 读取配置文件,加载字典
- data = txt_to_dict(glob_config)
-
- # 获取特定键值对的值
- origin_dir = data.get('origin_dir', 'null')
- backup_dir = data.get('backup_dir', 'null')
- file_prefix_prod_nrtk = data.get('file_prefix_prod_nrtk', 'null')
-
- # 拼接完整的org文件名称
- file_name_fri = file_prefix_prod_nrtk + str(day_fri * 10) + ".org"
- origin_dir_all_fri = os.path.join(origin_dir, month_fri, file_name_fri)
- file_name_sun = file_prefix_prod_nrtk + str(day_sun * 10) + ".org"
- origin_dir_all_sun = os.path.join(origin_dir, month_sun, file_name_sun)
- file_name_sat = file_prefix_prod_nrtk + str(day_sat * 10) + ".org"
- origin_dir_all_sat = os.path.join(origin_dir, month_sat, file_name_sat)
- # print(origin_dir_all_fri, origin_dir_all_sun, origin_dir_all_sat)
-
- try:
- # 合并文件
- output_file = os.path.join(backup_dir, file_prefix_prod_nrtk + "_merge.org")
- merge_file = merge_files(origin_dir_all_fri, origin_dir_all_sun, origin_dir_all_sat, output_file)
-
- # 拼接周六的起始和截止关键字
- day_fri_format = time_exchange(str(fri))
- pattern_start_sun = "160000.00," + str(day_fri_format)
- day_sun_format = time_exchange(str(sun))
- pattern_end_sun = "155959.00," + str(day_sun_format)
-
- # 拼接周日的起始和截止关键字
- # day_sun_format = time_exchange(str(sun))
- pattern_start_sat = "160000.00," + str(day_sun_format)
- day_sat_format = time_exchange(str(sat))
- pattern_end_sat = "155959.00," + str(day_sat_format)
-
- copy_exchange(merge_file, backup_dir, str(sun), pattern_start_sun, pattern_end_sun)
- time.sleep(5)
- copy_exchange(merge_file, backup_dir, str(sat), pattern_start_sat, pattern_end_sat)
-
- if merge_file:
- os.remove(merge_file)
-
- except Exception as e:
- print(e)
-
-
-
-
- def time_exchange(date_str):
- # YYYY-MM-DD格式转换成DD,MM,YYYY
- date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d")
- formatted_date = date_obj.strftime("%d,%m,%Y")
-
- return formatted_date
-
-
- def merge_files(file1, file2, file3, output_file):
- with open(output_file, 'w') as outfile:
- with open(file1) as infile1:
- outfile.write(infile1.read())
- with open(file2) as infile2:
- outfile.write(infile2.read())
- with open(file3) as infile3:
- outfile.write(infile3.read())
-
- return output_file
-
-
- def run_threading(job_func):
- # 多线程并行运行
- job_thread = threading.Thread(target=job_func)
- job_thread.start()
-
-
- if __name__ == '__main__':
- # 读取配置文件,加载字典
- data = txt_to_dict(glob_config)
- # 获取运行时间
- run_time_daily = data.get('run_time_daily', '08:05')
- run_time_weekly = data.get('run_time_weekly', '00:05')
-
- # 创建日常任务调度器
- schedule_daily = schedule.Scheduler()
-
- # 每天定时调度日常任务
- schedule_daily.every().day.at(run_time_daily).do(run_threading, nrtk_daily_job)
- # schedule.every(2).minutes.do(run_threading, job_test) # 每两分钟
- # schedule.every(2).minutes.do(run_threading, job_prod) # 每两分钟
-
- # 创建周任务调度器
- schedule_weekly = schedule.Scheduler()
-
- # 每周的星期一定时调度周任务
- schedule_weekly.every().monday.at(run_time_weekly).do(run_threading, nrtk_weekly_job)
-
- # 立即执行所有任务
- schedule_daily.run_all()
- schedule_weekly.run_all()
-
- while True:
- schedule_daily.run_pending()
- schedule_weekly.run_pending()
- time.sleep(1)
config配置文件:
# 日常任务执行时间-北京时间(格式:小时:分钟) run_time_daily=08:05 # 周任务执行时间-北京时间(每周一定时执行) run_time_weekly=00:05 # 读取org文件的外层目录 origin_dir=E:\data\RefData.23 # 读取org文件的文件名前缀 file_prefix_prod_nrtk=GGA_NRTK_PROD_Incoming_ file_prefix_prod_s2o=GGA_S2O_PROD_Incoming_ file_prefix_test_s2o=GGA_S2O_TEST_Incoming_ # 备份文件的路径 backup_dir=E:\data\RefData.23\备份