• Python 全栈系列253 再梳理flask-celery的搭建


    说明

    最近做了几个实验,将结论梳理一下,方便以后翻看。

    • 1 flask-celery 主要用于数据流的同步任务,其执行由flask-aps发起,基于IO并发的方法,达到资源的高效利用,满足业务上的需求。
    • 2 目前部署环境有算网机和anygpu

    内容

    1 环境

    现在看起来,将flask-celery部署在宿主机上是可行的,一般来说我会先安装miniconda3,有些容器本身就已经是anaconda3的就不动了。

    apt的升级是可选的

    sudo apt update
    sudo apt upgrade -y
    
    wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
    
    bash https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
    
    ~/miniconda3/bin/conda init
    
    source ~/.bashrc
    
    conda 24.4.0
    
    which conda
    /root/miniconda3/bin/conda
    

    然后,一般就在base环境下安装包,如果本身已经是容器环境(租用机有些不允许再安装软件),就直接安装相关python3包

    pip3 install ipython -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install requests -i https://mirrors.aliyun.com/pypi/simple/
    wget -NO Basefuncs-1.10-py3-none-any.whl http://YOURS/downup/view008_download_from_folder/pys.Basefuncs-1.10-py3-none-any.whl
    pip3 install Basefuncs-1.10-py3-none-any.whl
    pip3 install clickhouse_driver -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install pandas -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install numpy -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install redis -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install pydantic -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install nest_asyncio -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install aiohttp -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install Flask -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install Flask-APScheduler -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install celery -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install gunicorn -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install mongoengine -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install apscheduler -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install tornado -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install Pillow -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install markdown -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install pymysql -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install gevent -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install akshare -i https://mirrors.aliyun.com/pypi/simple/
    

    接下来需要在指定位置创建文件

    • 1 如果是在算网机,那么直接拉取项目文件,一般放在 /opt/project_notes/flask_celery 下面
    • 2 如果是在租用机,一般建立在/opt/flask_celery 下面,采用vim + 拷贝的方式
    systemd

    一般在宿主机级别可以有这个服务的控制,但是很多算力机也是没有的,只租用容器(怎么有种卖艺不卖身的感觉 - - !)。
    如果没有systemd,那么就需要切到对应目录下,直接用后台启动的方式

    直接用命令启动的方式,切换到对应目录下

    cd  /opt/project_notes/flask_celery 
    或
    cd /opt/flask_celery
    
    vim server_single_v2.py
    # 如果采用了非标的redis配置,需要到程序里面修改一下地址
    celery_broker = 'redis://:YOURS@127.0.0.1:24008/1'
    
    # 前台启动命令
    gunicorn server_single_v2:app -b 0.0.0.0:24104
    celery -A server_single_v2.celery_ worker
    

    写的时候刚打开,一堆积压的任务立即扑面而来。
    在这里插入图片描述
    我突然意识到celery的异步可能是线程级,而不是协程级的。(因为worker编号看起来很像我的cpu核数编号)
    在这里插入图片描述
    算了,先放过自己 。用一段时间看看,之后可以自己创建一个协程级的服务来进行流转和调度。我主要的应用应该就是在于异步请求API。我想以后有一块很大的改造就是将取数/查询部分的服务都使用异步进行优化。可以参考tornado同步转异步几种方式,先从搭建异步tornado开始。

    编辑启动脚本vim ~/start_flask_celery.sh。理论上需要source才能activate,虽然即使不activate也可以运行,但我猜主要是因为在服务里面写了conda环境,所以默认进入了base。

    #!/bin/bash
    # 激活 base 环境(或你创建的特定环境)
    source /root/miniconda3/etc/profile.d/conda.sh
    conda activate base
    
    #!/bin/bash
    #anaconda环境
    
    # 运行 Python 服务脚本
    # 算网机
    cd /opt/project_notes/flask_celery 
    # 租用机
    # cd /opt/flask_celery
    
    nohup gunicorn server_single_v2:app -b 0.0.0.0:24104 >/dev/null 2>&1 &
    nohup celery -A server_single_v2.celery_ worker >/dev/null 2>&1 &
    

    如果没有systemd,那么就开机的时候手动执行一下脚本,一般还是尽量注册为systemd服务,注意一下两种不同环境的配置。
    先修改脚本执行权限 chmod +x ~/start_flask_celery.shvim /lib/systemd/system/flask_celery.service

    [Unit]
    Description=flask_celery_service
    After=network.target network-online.target syslog.target
    Wants=network.target network-online.target
    
    [Service]
    #启动服务的命令
    Type=forking
    ExecStart=/root/start_flask_celery.sh
    Restart=always
    RestartSec=5
    # miniconda
    Environment="PATH=/root/miniconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
    # anaconda
    #Environment="PATH=/root/anaconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
    
    [Install]
    WantedBy=multi-user.target
    

    然后配置重载、启动和自启动

    systemctl daemon-reload
    systemctl start flask_celery
    systemctl enable flask_celery
    systemctl status flask_celery
    
    ┌─root@m7:~
    └─ $ systemctl status  flask_celery
    ● flask_celery.service - flask_celery_service
       Loaded: loaded (/lib/systemd/system/flask_celery.service; enabled; vendor preset: enabled)
       Active: active (running) since Sat 2024-06-15 13:03:51 CST; 1s ago
      Process: 428 ExecStart=/root/start_flask_celery.sh (code=exited, status=0/SUCCESS)
        Tasks: 38 (limit: 4915)
       CGroup: /system.slice/flask_celery.service
               ├─475 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
               ├─476 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
               ├─556 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
               ├─864 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
               ├─924 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
               ├─949 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
               └─975 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
    
    615 13:03:51 m7 systemd[1]: Starting flask_celery_service...
    615 13:03:51 m7 systemd[1]: Started flask_celery_service.
    

    测试

    import requests as req 
    
    para_dict = {'arg1':111,
                 'arg2':222}
    
    resp = req.post('http://127.0.0.1:24104/sum_post/',json = para_dict )
    print('每个任务耗时10秒')
    import time 
    tick1 = time.time()
    print(req.get('http://127.0.0.1:24104/get_result/%s' % resp.text).text)
    tick2 = time.time()
    print('actually takes %.2f' % (tick2 - tick1 ))
    
    每个任务耗时10333
    actually takes 10.02
    
    
  • 相关阅读:
    如何通过Navicat导入sql文件
    【蓝桥杯物联网赛项学习日志】Day1 点亮led
    后量子时代,未来密码该何去何从?
    基于Docker_Nginx+LVS+Flask+MySQL的高可用Web集群
    【Flink源码】Flink程序启动的那些事儿
    WPF 快捷键关闭当前窗口 KeyDown(事件)
    【LeetCode力扣】189 53 轮转数组 | 最大子数组和
    RibbonPage
    Spring整合Mybatis和Junit小案例(9)
    Python之字符串判断
  • 原文地址:https://blog.csdn.net/yukai08008/article/details/139697360