• Django+Celery框架自动化定时任务开发


    本章介绍使用DjCelery即Django+Celery框架开发定时任务功能,在Autotestplat平台上实现单一接口自动化测试脚本、业务场景接口自动化测试脚本、App自动化测试脚本、Web自动化测试脚本等任务的定时执行、调度、管理等,从而取代Jenkins上的定时执行脚本和发送邮件等功能。**

    自动化测试逻辑流程图11.1所示。

    11.1 环境搭建

    1.安装

    步骤1 安装Celery。pyramid_celery-3.0.0,

    配置https://pypi.python.org/pypi/pyramid_celery/。

    步骤2 安装django-clery。django-celery-3.2.2,

    配置https://pypi.python.org/pypi/django- celery。 INSTALLED_APPS= []

    加入2:

    'djcelery', 运行 Python manage.py migrate

    步骤 3 安装celery-with-redis-3.0,

    地址为https://pypi.python.org/pypi/celery-with-redis/。

    步骤 4 安装django-clery-beat。django-celery-beat-1.1.0,

    https://pypi.python.org/pypi/ django_celery_beat。

    步骤5 下载Redis-x64-3.2.100,

    Redis-x64-3.2.100.zip github.com/MicrosoftAr…

    2.配置

    步骤1 在Settings.py中增加如下内容。

    加入1:

    import djcelery

    djcelery.setup_loader() #加载djcelery

    加入2:

    #数据库调度

    CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'

    加入3:

    BROKER_URL = 'redis://127.0.0.1:6379/0'

    BROKER_TRANSPORT = 'redis'

    步骤2 在应用Apitest目录下新建celery.py文件1,加入如下内容。

    from future import absolute_import

    import os

    import django

    from celery import Celery

    from django.conf import settings

    os.environ.setdefault('DJANGO_SETTINGS_MODULE','autotest.settings')

    django.setup()

    app = Celery('autotest')

    app.config_from_object('django.conf:settings')

    app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

    步骤3 新建tasks.py文件,加入如下内容。

    --coding:utf-8 --

    importrequests, time, sys, re

    importurllib, zlib#,

    importpymysql

    importunittest

    from traceimport CoverageResults

    importjson

    fromidlelib.rpc import response_queue

    fromapitest.celery import app

    from timeimport sleep

    @app.task

    def hello_world():

    print('已运行')

    步骤4 启动服务python manage.py runserver。

    步骤5 解压缩后,运行CMD,切换到相应目录,输入启动Redis指令redis-server redis. windows.conf,成功后出现如图11.2所示界面。

    ▲图11.2

    步骤6 启动指令python manage.py celery worker -l info。

    步骤7 启动指令python manage.py celery beat。

    11.2 前端功能实现

    1.功能描述

    完成实现单一接口测试用例、业务场景接口API测试用例、AppUI测试用例、WebUI测试用例的自动化定时任务。

    2.程序清单

    在autotest\apitest\templates目录下新建periodic_task.html文件,加入如下内容。

    {% load bootstrap4 %}

    {% bootstrap_css %}

    {% bootstrap_javascript %}

    产品自动化测试平台

    {% csrf_token %}

    增加

    {% for task in tasks %}{% for periodic inperiodics %}

    {% if task.interval_id != null andtask.interval_id == periodic.id %}

    {% else %}

    {% endif %}

    {% for crontab in crontabs %}

    {% if task.crontab_id != null and task.crontab_id ==crontab.id and task.interval_id == 1 %}

    {% else %}

    {% endif %}

    {% endfor %}{% endfor %}{% endfor %}

    ID任务名称任务模块时间计划修改时间开启立即编辑删除
    {{ task.id }} {{ task.name }} {{ task.task }} 每{{ periodic.period }} {{ periodic.every}}次 {{ task.date_changed }} {{ task.enabled }} {% if task.id == 1 %}

    运行

    {% elif task.id == 2 %}

    运行

    {% else %}

    {% endif %}

    {{ task.id }} {{ task.name }} {{ task.task }} {{crontab.month_of_year }}年{{crontab.day_of_month }}月{{crontab.day_of_week }}日{{crontab.hour }}时{{ crontab.minute}}分 {{ task.date_changed }} {{ task.enabled }} 运行

    {# 把翻页功能固定显示在右下角#}

    总数{{ taskcounts }} {# 前端读取定义的变量#}

    1. <ulclass="pagination" id="pager">
    2. {#上一页链接开始#}
    3. {%if tasks.has_previous %}
    4. {# 如果有上一页,则正常显示上一页链接#}
    5. <li><ahref="/periodic_task/?page={{ tasks.previous_page_number }}">上一页</a></li> {# 上一页标签 #}
    6. {%else %}
    7. <li class="previous disabled"><ahref="#">上一页</a></li>{# 如果当前不存在上一页,则上一页的链接不可单击#}
    8. {%endif %}
    9. {# 上一页链接开始#}
    10. {%for num in tasks.paginator.page_range %}
    11. {% if num == currentPage %}
    12. <li><a href="/periodic_task/?page={{ num }}">{{ num}}</a></li> {#显示当前页数链接#}
    13. {% else %}
    14. <liclass="item"><a href="/periodic_task/?page={{ num}}">{{ num }}</a></li>
    15. {% endif %}
    16. {% endfor %}
    17. {# 下一页链接开始#}
    18. {% if tasks.has_next %} {# 如果有下一页,则正常显示下一页链接#}
    19. <liclass="next"><a href="/periodic_task/?page={{tasks.next_page_number }}">下一页</a></li>
    20. {% else %}
    21. <li><a href="#">下一页</a></li>
    22. {% endif %}
    23. {# 下一页链接结束#}
    24. </ul>

    功能描述:实现自动化测试任务调度执行,包括单一接口、扫描、流程接口、业务场景、Web搜索、自动化平台测试开发、App登录,CSDN定时任务注册,定时任务执行等功能。

    程序清单:在apitest/views.py中加入如下内容。

    from .tasks importhello_world

    from .tasks importtest_readSQLcase

    from djcelery.modelsimport PeriodicTask,CrontabSchedule,IntervalSchedule

    任务计划

    @login_required

    defperiodic_task(request):

    1. username = request.session.get('user', '')
    2. task_list = PeriodicTask.objects.all()
    3. task_count =PeriodicTask.objects.all().count() #统计数
    4. periodic_list =IntervalSchedule.objects.all() # 周期任务(如每隔1小时执行1次)
    5. crontab_list =CrontabSchedule.objects.all() # 定时任务(如某年某月某日的某时,每# 天的某时)
    6. paginator = Paginator(task_list, 5) #生成paginator对象,设置每页显示5条记录
    7. page = request.GET.get('page',1) #获取当前的页码数,默认为第1页
    8. currentPage=int(page) #把获取的当前页码数转换成整数类型
    9. try:
    10. task_list = paginator.page(page)#获取当前页码数的记录列表
    11. except PageNotAnInteger:
    12. task_list = paginator.page(1)#如果输入的页数不是整数,则显示第1页内容
    13. except EmptyPage:
    14. task_list =paginator.page(paginator.num_pages)#如果输入的页数不在系统的页数中,# 则显示最后一页内容
    15. return render(request,"periodic_task.html", {"user": username,"tasks":task_list,"taskcounts": task_count, "periodics":periodic_list,"crontabs": crontab_list })

    搜索功能

    @login_required

    deftasksearch(request):

    1. username = request.session.get('user', '')# 读取浏览器登录Session
    2. search_name =request.GET.get("task", "")
    3. task_list = PeriodicTask.objects.filter(task__icontains=search_name)
    4. periodic_list =IntervalSchedule.objects.all() # 周期任务(如每隔1小时执行1次)
    5. crontab_list =CrontabSchedule.objects.all() # 定时任务(如某年某月某日的某时,每# 天的某时)
    6. return render(request,'periodic_task.html',{"user": username,"tasks":task_list,"periodics":periodic_list,"crontabs": crontab_list })

    在autotest/urls.py中加入:

    path('periodic_task/',views.periodic_task),

    path('tasksearch/', views.tasksearch),
    

    在apitest/left.html中加入:

    1. <li>
    2. <a href="../periodic_task"target="mainFrame">
    3. <iclass="glyphicon glyphicon-fire"></i>
    4. 任务计划
    5. </a>
    6. </li>

    <tr><td>&nbsp;</td></tr>

    查看前端页面效果,如图11.3所示。

    ▲图11.3

    最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

    这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

  • 相关阅读:
    Android 12.0 自定义仿小米全面屏手势导航左右手势滑动返回UI效果
    mapperXML标签总结
    【推荐算法】召回模型总结
    学习自旋电子学的笔记06:“扫参数”批量微磁模拟,ubermag介绍,微磁模拟求助
    Chrome扩展的核心:manifest 文件(上)
    Docker学习笔记
    TCP协议的三次握手及其原因
    快速幂算法
    Android应用开发架构之MVI:原理、流程与实战指南
    什么台灯最好学生晚上用?开学适合学生用的护眼台灯推荐
  • 原文地址:https://blog.csdn.net/2301_78276982/article/details/134269856