单一模式运行Celery在官方文档中已经贴出范例代码,这里我们不过多介绍。
在使用Flask编写大型项目时,使用工厂模式的好处显而易见。因在Celery4.x以上版本已经抛弃了init_app方法,所以我们传统的在create_app中初始化并传入app对象的方式已经不可用了。我们改变方式在create_app中直接创建Celery对象就会导致循环导入问题,该如何解决呢?下面我们来一步步操作。
一、首先贴出我的项目结构:
简单介绍一下项目结构(重点说一下Celery组成):
apis:项目接口存放文件夹
apps:Flask项目传统文件夹,不在赘述
---admin:模型文件夹
---decorators.py:自定义装饰器
---tasks.py:Celery任务文件,存放所有调度执行任务
ext:第三方扩展文件夹,创建第三方扩展对象。例如数据库、api、缓存、跨域等等
---celery.py:Celery主文件,创建celery对象,后期引用@celery.task()主要在这
migrations:数据库迁移等文件夹
static:静态文件文件夹
templates:模版文件夹
app.py:项目主启动文件
celery_settings.py:Celery配置文件
settings.py:项目主配置文件
二、实现步骤
1.下载Celery
1 | pip install celery |
2.编写celery配置
1 2 3 4 5 6 | broker_url = 'redis://127.0.0.1:6379/0' result_backend = 'redis://127.0.0.1:6379/0' task_serializer = 'json' result_serializer = 'json' accept_content = [ 'json' ] broker_connection_retry_on_startup = True |
3.生成Celery对象
在celery.py中编写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | def create_celery(app = None ): celery = Celery(app.import_name, backend = broker_url, broker = result_backend) celery.conf.update(app.config) celery.config_from_object( 'celery_settings' ) class ContextTask(celery.Task): def __call__( self , * args, * * kwargs): with app.app_context(): return self .run( * args, * * kwargs) celery.Task = ContextTask return celery |
4.编写任务
最后我们就可以在tasks.py中编写调度任务
1 2 3 4 5 6 | celery = create_celery(create_app()) @celery .task() def insert_sql(): for i in range ( 10 ): print (i) time.sleep( 1 ) |
然后我们在需要执行调度任务的代码处增加启动代码:
1 | insert_sql.delay() |
5.启动celery
最后不要忘记启动celery,然后就可以在执行到启动任务代码时,执行异步任务。
以上就是我们如何在Flask工厂模式中使用Celery范例了。
新手上路,如有疏漏错误,还请各位大佬不吝赐教。