• python基本调度工具Apscheduler用法


    一款基于python 库 自带的调度任务工具…,个人觉得比crontab好使,主要是填写时间方便

    1-Apscheduler 部分
    1-1 顺序执行
    #先安装
    #pip install apscheduler
    #jupyter 环境下就 !pip install apscheduler
    #实例my_apscheduler.py
    from apscheduler.schedulers.background import BlockingScheduler
    from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
    from datetime import datetime
    import time,os
    from pytz import utc
    
    #名为“default”的 ThreadPoolExecutor,工作线程计数为 20
    #名为“processpool”的 ProcessPoolExecutor,工作器计数为 5
    #执行器
    executors = {
        "default": ThreadPoolExecutor(20),#默认线程数
        "processpool": ProcessPoolExecutor(5)
    }
    
    #默认情况下,新作业的合并处于关闭状态
    #新作业的默认最大实例限制为 3
    job_defaults = {
        "coalesce": False,
        "max_instances": 3
    }
    
    tt=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    
    #实例化调度器
    #UTC 作为调度程序的时区
    scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults,timezone=utc)
    
    #创建一个任务
    def my_job():  # 任务 一天调度一次,
    	print(tt)#打印时间
    	cm0="python root/train.py"
    	cm1="python test.py"
    	os.system(cm0)
    	os.system(cm1)
    
    scheduler.add_job(my_job, 'cron', hour=0,minute=1)#每天的0点1分,cron是定时定点跑
    #scheduler.add_job(my_job, 'interval', minutes = 1)#每分钟跑
    #开启调度
    scheduler.start()  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    1-2 并行执行

    Popen()函数返回的是一个Popen对象,而不是像os.system()函数一样返回一个命令执行结果的返回值。如果需要获取命令执行结果,可以使用Popen.communicate()函数来实现。

    import subprocess
    #创建一个任务
    def my_job():
        tt = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        print(tt)
        
        cmd1 = ["python", "E:\\Pycharm_project\\Sche_task\\20230428\\recognize_target_type.py"]
        cmd2 = ["python", "E:\\Pycharm_project\\Sche_task\\20230428\\split_task_update_status_send_request.py"]
        
        #subprocess.Popen()函数中使用了stdout=subprocess.PIPE和stderr=subprocess.PIPE参数,表示将子进程输出重定向到管道中,并使用communicate()方法读取子进程输出的结果。
        p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        p2 = subprocess.Popen(cmd2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        out1, err1 = p1.communicate()#获取输出结果
        out2, err2 = p2.communicate()#方法读取子进程输出的结果
        
        print(out1.decode('utf-8'))
        print(err1.decode('utf-8'))
        print(out2.decode('utf-8'))
        print(err2.decode('utf-8'))
    #要获取命令执行结果,可以使用subprocess.Popen()函数的communicate()方法。
    #该方法会等待子进程完成并返回一个元组,
    #其中第一个元素是子进程输出的标准输出结果,第二个元素是子进程输出的标准错误结果。
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    并发执行

    import concurrent.futures
    import os
    
    def my_job():
        tt = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        print(tt)
    
        # 使用 ThreadPoolExecutor 来并行执行 cm0 和 cm1
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_cm0 = executor.submit(os.system, "python root/train.py")
            future_cm1 = executor.submit(os.system, "python test.py")
    
        # 等待两个任务完成
        concurrent.futures.wait([future_cm0, future_cm1])
    
    scheduler.add_job(my_job, 'cron', hour=0, minute=1)
    scheduler.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    如果需要等待两个命令都执行完成后才结束调度,可以在my_job()函数中使用subprocess.Popen()函数的wait()方法

    import subprocess
    
    def my_job():
        tt = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        print(tt)
        
        cmd1 = ["python", "E:\\Pycharm_project\\Sche_task\\20230428\\recognize_target_type.py"]
        cmd2 = ["python", "E:\\Pycharm_project\\Sche_task\\20230428\\split_task_update_status_send_request.py"]
        
        p1 = subprocess.Popen(cmd1)
        p2 = subprocess.Popen(cmd2)
        
        p1.wait()
        p2.wait()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    2-Infinite loop scheduling
    #死循环调度
    import os
    import time
    import datetime
    
    def time_printer():
        now = datetime.datetime.now()
        ts = now.strftime('%Y-%m-%d %H:%M:%S')
        print("do func time:",ts)
        
    while True:
        time_printer()
        cmd0 = 'python /my.py'
        os.system(cmd0)
      
        print("%s 已经执行完毕"%(cmd0))
        
        time.sleep(300)#每5分钟调度
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    2-Scheduling script

    有时候你的调度脚本会因为意外挂掉,给他一个重启的脚本

    #案列schedu_task.sh 查看进程,如果挂掉后3秒重启该调度脚本
    #!/bin/sh
    while true;
    do
        processExist=`ps aux|grep my_apscheduler.py |grep -v grep`
        echo "$processExist"
        
        if [ "$processExist" == "" ];
        then
    	 echo 'py file have been finished'
    	 echo 'process has been restarted!'
     	 `nohup python3 /root/my_apscheduler.py >> my_apscheduler.log 2>&1 &`
        else
             echo 'process already started!'
        fi
    sleep 3
    done
    
    #运行
     nohup bash /root/my_schduler.sh &
    #不写shell 脚本 直接后台执行调度文件
    nohup python -u /root/my_apscheduler.py > out_put.log 2>&1 &
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    说明

    • 使用ps aux命令列出所有正在运行的进程,并将其输出通过管道传递给grep your_file.py命令进行匹配。
    • 使用grep -v grep过滤掉匹配中包含grep关键字的行。
    • 将匹配结果赋值给processExist变量。
    • 输出$processExist以查看是否存在匹配到的进程。
    • 如果变量为空字符串,则表示没有找到匹配的进程,输出相应信息并重启进程。
    • 如果变量不为空,则表示进程已经在运行,输出相应信息。

    第二种方式处理

    #!/bin/bash
    
    while true; do
        # 检查进程是否在运行
        if pgrep -f "python your_file.py" >/dev/null; then# /dev/null 静默处理 
            echo "进程正在运行"
        else
            echo "进程挂掉了!重新启动中..."
            # 重启进程的操作
            python your_file.py &
        fi
    
        sleep 5  # 可以根据需要调整检测的时间间隔
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    解释说明

    • pgrep -f “python your_file.py”:这是一个使用pgrep命令的形式,通过参数-f(full)和一个匹配模式来查找包含指定字符串的进程。在这里,我们使用"python your_file.py"作为匹配模式,表示查找包含该字符串的进程。如果找到匹配的进程,pgrep命令将返回进程的PID(进程ID);否则,它将不会有任何输出。

    • /dev/null:这是一个重定向操作,将前面命令的标准输出(stdout)重定向到/dev/null设备文件中。/dev/null是一个特殊的设备文件,直接丢弃所有写入它的内容。在这里,我们将pgrep命令的输出重定向到/dev/null,因此不会在终端上显示输出信息。

    • if语句:这是一个条件语句,用于根据pgrep命令的结果进行判断。如果pgrep命令返回了任何输出(即找到了匹配的进程),则条件为真,执行if语句后面的代码块。否则,条件为假,执行if语句后面的其他代码块(如果有)。

    3-Generic mysql writes to the database
    #将清洗或得到的结果插入mysql数据库
    #链接mysql
    def conn_sql():
    	test_mysql = {"host":"IP",'user': '用户名','password':mima
    	             ,'charset': 'utf8', 'port':端口号,'connect_timeout':10
    	        }
    	conn = pymysql.connect(**test_mysql)
    	return conn
    	
    def insert_sql(result, tablename):
        import pymysql
        conn = conn_sql()
        cs = conn.cursor()  # 获取游标
        list_columns = result.columns
        list_columns = tuple(list_columns)
        list1 = []
        for i in range(0, len(result.columns)):
            list1.append('%s')
        list1_values = tuple(list1)
        sql = """insert into '%s'%s' values%s""" % (tablename, list_columns, list1_values)
        sql = sql.replace("'", "")
        result6 = result.apply(lambda x: tuple([i for i in x]), axis=1)
        res = list(result6)
        cs.executemany(sql, res)
        conn.commit()
        cs.close()
        conn.close()
    
    
    # 删除重新插入的数据
    def delete_sql_in(result, insert_tablename):
        # conn = pymysql.connect(**mysql2)
        conn = conn_sql()
        sql_c = """select * from {} where date_time=DATE(NOW())""".format(insert_tablename)
        df_c = pd.read_sql(sql_c, conn)
        if df_c.empty == False:
            del_sql = """delete from {} WHERE date_time =DATE(NOW())""".format(insert_tablename)
            cs = conn.cursor()  # 获取游标
            cs.execute(del_sql)
            conn.commit()
            insert_sql(result, insert_tablename)
            print("写入已完成")
        else:
            insert_sql(result, insert_tablename)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    记录就是为了能cv绝不手写的原则…
    了解更多用法可以浏览Apscheduler官方链接

  • 相关阅读:
    Linux--shell脚本详解
    Apollo中的身份验证与授权:保护你的数据
    读取 resources 目录下的文件路径的几种方法
    2_JavaScript面试题
    保密工作应党而生、伴党而行、为党而兴
    18.4 【Linux】systemd-journald.service 简介
    docker 上传镜像
    创建型设计模式 原型模式 建造者模式 创建者模式对比
    SpringBoot Aop使用篇
    python入门基础(14)--类的属性、成员方法、静态方法以及继承、重载
  • 原文地址:https://blog.csdn.net/Elvis__c/article/details/127557737