• python_定时任务自动匹配数据II


    从配置文件读取配置参数,并每天定时执行日常任务、每周一定时执行周任务,匹配符合要求的GGA数据,文件查找-复制-合并-裁剪。

    定时任务运行的日志:

    1. import os
    2. import re
    3. import shutil
    4. import threading
    5. import datetime
    6. import time
    7. import schedule
    8. # 全局变量:配置文件
    9. glob_config = "config_nrtk.txt"
    10. def txt_to_dict(filename):
    11. with open(filename, 'r', encoding='UTF-8-sig') as f: # 读取的文件或者写入文件时会出现”\ufeff”非法字符,需要改变编码方式‘UTF-8‘为‘UTF-8-sig‘
    12. data_dict = {}
    13. for line in f:
    14. # 忽略空行和注释行
    15. if line.strip() == '' or line.startswith('#'):
    16. continue
    17. key, value = line.strip().split('=')
    18. data_dict[key] = value
    19. return data_dict
    20. def date_to_th(date_t):
    21. # 获取今年的第一天
    22. first_day = datetime.date(date_t.year, 1, 1)
    23. # 计算当前天是今年的第几天
    24. day_of_year = (date_t - first_day).days + 1
    25. # 获取当前天的月份
    26. month = date_t.month
    27. # 将月份转换为英文,保留3个字符
    28. month_name = ["Unknown", "January", "February", "March", "April", "May", "June",
    29. "July", "August", "September", "October", "November", "December"][month][:3]
    30. print("当前天是{}月。".format(month_name))
    31. print("当前天是今年的第{}天。".format(day_of_year))
    32. return day_of_year, month_name
    33. def copy_exchange(src_path, dst_path, dt, pattern_start="120000.00", pattern_end="235959.00"):
    34. # 文件命名
    35. now = datetime.datetime.now()
    36. fn = re.sub(r'-', '', dt)
    37. pp = pattern_start.split(".")
    38. fnn = pp[0]
    39. # 区分不同环境的pos文件
    40. if "NRTK_PROD" in src_path:
    41. print("PROD环境的NRTK文件")
    42. file_name = "NRTK_PROD_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.org'
    43. # elif "PROD_V2" in src_path:
    44. # print("PROD_V2环境的SSR文件")
    45. # file_name = "PROD_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.pos'
    46. # elif "TEST_V2_P" in src_path:
    47. # print("TEST_V2_P环境的SSR文件")
    48. # file_name = "TEST_P_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.pos'
    49. # elif "TEST_V2" in src_path:
    50. # print("TEST_V2环境的SSR文件")
    51. # file_name = "TEST_" + fn + '_' + fnn + now.strftime('_%H%M%S') + '.pos'
    52. else:
    53. print("非PROD和TEST环境的SSR文件")
    54. file_name = fn + '_' + fnn + now.strftime('_%H%M%S') + '.org'
    55. # 复制文件
    56. dst_path_c = os.path.join(dst_path, file_name)
    57. shutil.copy2(src_path, dst_path_c)
    58. # 写入目标文件
    59. dst_w_path = os.path.join(dst_path, 'N_' + file_name)
    60. with open(dst_path_c, 'r') as dst, open(dst_w_path, 'w') as dst_w:
    61. # 默认不写入文件
    62. writing = False
    63. num_count = 0
    64. for line in dst:
    65. # 正则匹配,起始行
    66. if re.search(pattern_start, line) and line.startswith('$GPZDA'):
    67. writing = True
    68. if writing:
    69. num_count += 1
    70. dst_w.write(line)
    71. # 正则匹配,截止行
    72. if re.search(pattern_end, line): # and line.startswith('$GPGGA'):
    73. writing = False
    74. # break
    75. if num_count == 0:
    76. print(r'当前时间:%s,没有数据匹配' % (now.strftime('%Y-%m-%d %H:%M:%S')))
    77. else:
    78. print(r'当前时间:%s,总共%s行写入完成,起始于:%s,截止于:%s' % (
    79. now.strftime('%Y-%m-%d %H:%M:%S'), num_count, pattern_start, pattern_end))
    80. if dst_path_c:
    81. os.remove(dst_path_c)
    82. def nrtk_daily_job():
    83. # 获取今天的日期
    84. today = datetime.date.today()
    85. # 获取昨天的日期
    86. yesterday = today - datetime.timedelta(days=1)
    87. # print("昨天的日期是:", yesterday)
    88. day_y, month_y = date_to_th(yesterday)
    89. # 读取配置文件,加载字典
    90. data = txt_to_dict(glob_config)
    91. # 获取特定键值对的值
    92. origin_dir = data.get('origin_dir', 'null')
    93. backup_dir = data.get('backup_dir', 'null')
    94. file_prefix_prod_nrtk = data.get('file_prefix_prod_nrtk', 'null')
    95. # 拼接完整的org文件名称
    96. file_name = file_prefix_prod_nrtk + str(day_y * 10) + ".org"
    97. # 拼接读取的文件路径
    98. origin_dir_all = os.path.join(origin_dir, month_y, file_name)
    99. # 拼接昨天的起始和截止关键字
    100. day_y_format = time_exchange(str(yesterday))
    101. pattern_start_y = "120000.00," + str(day_y_format)
    102. day_y_format = time_exchange(str(yesterday))
    103. pattern_end_y = "235959.00," + str(day_y_format)
    104. if origin_dir_all:
    105. copy_exchange(origin_dir_all, backup_dir, str(yesterday), pattern_start_y, pattern_end_y)
    106. else:
    107. print(r'当前时间:%s,%s文件不存在' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), origin_dir_all))
    108. def nrtk_weekly_job():
    109. # 获取今天的日期
    110. today = datetime.date.today()
    111. # 获取今天的偏移量
    112. idx = (today.weekday() + 1) % 7 # MON = 0, SUN = 6 -> SUN = 0 .. SAT = 6
    113. # 获取上周五、六、日的日期
    114. fri = today - datetime.timedelta(idx + 2)
    115. sun = today - datetime.timedelta(idx + 1)
    116. sat = today - datetime.timedelta(idx)
    117. # print(fri, sun, sat)
    118. day_fri, month_fri = date_to_th(fri)
    119. day_sun, month_sun = date_to_th(sun)
    120. day_sat, month_sat = date_to_th(sat)
    121. # 读取配置文件,加载字典
    122. data = txt_to_dict(glob_config)
    123. # 获取特定键值对的值
    124. origin_dir = data.get('origin_dir', 'null')
    125. backup_dir = data.get('backup_dir', 'null')
    126. file_prefix_prod_nrtk = data.get('file_prefix_prod_nrtk', 'null')
    127. # 拼接完整的org文件名称
    128. file_name_fri = file_prefix_prod_nrtk + str(day_fri * 10) + ".org"
    129. origin_dir_all_fri = os.path.join(origin_dir, month_fri, file_name_fri)
    130. file_name_sun = file_prefix_prod_nrtk + str(day_sun * 10) + ".org"
    131. origin_dir_all_sun = os.path.join(origin_dir, month_sun, file_name_sun)
    132. file_name_sat = file_prefix_prod_nrtk + str(day_sat * 10) + ".org"
    133. origin_dir_all_sat = os.path.join(origin_dir, month_sat, file_name_sat)
    134. # print(origin_dir_all_fri, origin_dir_all_sun, origin_dir_all_sat)
    135. try:
    136. # 合并文件
    137. output_file = os.path.join(backup_dir, file_prefix_prod_nrtk + "_merge.org")
    138. merge_file = merge_files(origin_dir_all_fri, origin_dir_all_sun, origin_dir_all_sat, output_file)
    139. # 拼接周六的起始和截止关键字
    140. day_fri_format = time_exchange(str(fri))
    141. pattern_start_sun = "160000.00," + str(day_fri_format)
    142. day_sun_format = time_exchange(str(sun))
    143. pattern_end_sun = "155959.00," + str(day_sun_format)
    144. # 拼接周日的起始和截止关键字
    145. # day_sun_format = time_exchange(str(sun))
    146. pattern_start_sat = "160000.00," + str(day_sun_format)
    147. day_sat_format = time_exchange(str(sat))
    148. pattern_end_sat = "155959.00," + str(day_sat_format)
    149. copy_exchange(merge_file, backup_dir, str(sun), pattern_start_sun, pattern_end_sun)
    150. time.sleep(5)
    151. copy_exchange(merge_file, backup_dir, str(sat), pattern_start_sat, pattern_end_sat)
    152. if merge_file:
    153. os.remove(merge_file)
    154. except Exception as e:
    155. print(e)
    156. def time_exchange(date_str):
    157. # YYYY-MM-DD格式转换成DD,MM,YYYY
    158. date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d")
    159. formatted_date = date_obj.strftime("%d,%m,%Y")
    160. return formatted_date
    161. def merge_files(file1, file2, file3, output_file):
    162. with open(output_file, 'w') as outfile:
    163. with open(file1) as infile1:
    164. outfile.write(infile1.read())
    165. with open(file2) as infile2:
    166. outfile.write(infile2.read())
    167. with open(file3) as infile3:
    168. outfile.write(infile3.read())
    169. return output_file
    170. def run_threading(job_func):
    171. # 多线程并行运行
    172. job_thread = threading.Thread(target=job_func)
    173. job_thread.start()
    174. if __name__ == '__main__':
    175. # 读取配置文件,加载字典
    176. data = txt_to_dict(glob_config)
    177. # 获取运行时间
    178. run_time_daily = data.get('run_time_daily', '08:05')
    179. run_time_weekly = data.get('run_time_weekly', '00:05')
    180. # 创建日常任务调度器
    181. schedule_daily = schedule.Scheduler()
    182. # 每天定时调度日常任务
    183. schedule_daily.every().day.at(run_time_daily).do(run_threading, nrtk_daily_job)
    184. # schedule.every(2).minutes.do(run_threading, job_test) # 每两分钟
    185. # schedule.every(2).minutes.do(run_threading, job_prod) # 每两分钟
    186. # 创建周任务调度器
    187. schedule_weekly = schedule.Scheduler()
    188. # 每周的星期一定时调度周任务
    189. schedule_weekly.every().monday.at(run_time_weekly).do(run_threading, nrtk_weekly_job)
    190. # 立即执行所有任务
    191. schedule_daily.run_all()
    192. schedule_weekly.run_all()
    193. while True:
    194. schedule_daily.run_pending()
    195. schedule_weekly.run_pending()
    196. time.sleep(1)

    config配置文件:

    # 日常任务执行时间-北京时间(格式:小时:分钟)
    run_time_daily=08:05
    
    
    # 周任务执行时间-北京时间(每周一定时执行)
    run_time_weekly=00:05
    
    
    # 读取org文件的外层目录
    origin_dir=E:\data\RefData.23
    
    
    # 读取org文件的文件名前缀
    file_prefix_prod_nrtk=GGA_NRTK_PROD_Incoming_
    file_prefix_prod_s2o=GGA_S2O_PROD_Incoming_
    file_prefix_test_s2o=GGA_S2O_TEST_Incoming_
    
    
    # 备份文件的路径
    backup_dir=E:\data\RefData.23\备份

  • 相关阅读:
    输赢只是一时
    JVM垃圾回收
    万物皆可集成系列:低代码对接企企云实现数据集成
    地图与WebGIS、地图的作用、数字地图的应用
    DSA8300 泰克 Tektronix 数字采样示波器 简述
    MySQL读取的记录和我想象的不一致——事物隔离级别和MVCC
    vite+Vue3+ts之框架搭建过程与踩坑记录(持续补充)
    C++ Primer学习笔记-----第三章:字符串、向量、数组
    Apache Jmeter测压工具快速入门
    BPF bpf_spin_lock 使用排错指南
  • 原文地址:https://blog.csdn.net/ramsey17/article/details/134420532