• flask框架-[实现websocket]:将socketio处理函数部分集中管理,使用类的方式来管理,集中管理socketio处理函数


    一、项目依赖

    1. APScheduler==3.10.4
    2. eventlet==0.33.3
    3. Flask==2.1.3
    4. Flask-Caching==1.10.1
    5. Flask-Cors==3.0.10
    6. Flask-Migrate==2.7.0
    7. Flask-RESTful==0.3.9
    8. Flask-SocketIO==5.1.1
    9. Flask-SQLAlchemy==2.5.1
    10. PyJWT==2.3.0
    11. PyMySQL==1.0.2
    12. redis==3.5.3
    13. SQLAlchemy==1.4.0 #额外修改
    14. Werkzeug==2.0.2 #额外修改

    注意:在flask2.x版本依赖,不再支持flask_script了

    flask2.x版本会自动注册 flask run 和flask db 两个命令行命令

    1、启动项目

    flask run --host 0.0.0.0 --port 9000

    2、数据库迁移命令

    flask db init

    flask db migrate

    flask db upgrade

    二、项目结构

    apps

            __init__.py  : 创建app应用,各种注册

            websocket

                    consumers.py   #将所有socketio处理类放到这里

    base

            settings.py

    ext

            __init__.py:  拓展对象都放在这里

            config.py : 拓展对象的配置内容

    app.py

    三、具体代码使用

    settings.py

    1. import os
    2. import datetime
    3. def get_database(dic):
    4. '"mysql+pymysql://root:Huawei@123@localhost:3306/study_flask?charset=utf8"'
    5. engine = dic.get('ENGINE')
    6. driver = dic.get('DRIVER')
    7. user = dic.get('USER')
    8. host = dic.get('HOST')
    9. password = dic.get("PASSWORD")
    10. port = dic.get('PORT')
    11. name = dic.get('NAME')
    12. dbinfo = f"{engine}+{driver}://{user}:{password}@{host}:{port}/{name}?charset=utf8"
    13. return dbinfo
    14. #全局通用配置类
    15. class Config:
    16. """项目配置核心类"""
    17. DEBUG=True
    18. LOG_LEVEL = "INFO"
    19. SECRET_KEY= '8hdj^sasdas6736475#$#5&GHG'
    20. BASE_PATH = os.path.dirname(os.path.abspath(__file__))
    21. STATIC_PATH = os.path.join(BASE_PATH, 'static') #static/ 路由对应的目录
    22. TEMPLATES_PATH = os.path.join(BASE_PATH, 'templates') # 用于专门检索静态文件的位置,方便修改
    23. # 中文乱码
    24. JSON_AS_ASCII = False
    25. # # 配置redis
    26. # # 项目上线以后,这个地址就会被替换成真实IP地址,mysql也是
    27. # REDIS_HOST = 'your host'
    28. # REDIS_PORT = your port
    29. # REDIS_PASSWORD = 'your password'
    30. # REDIS_POLL = 10
    31. #数据库配置
    32. dbinfo={
    33. 'ENGINE':'mysql',
    34. 'DRIVER':'pymysql',
    35. 'USER':'root',
    36. 'PASSWORD':'123456',
    37. 'HOST':"127.0.0.1",
    38. "PORT":"3306",
    39. 'NAME':'flask_obj'
    40. }
    41. # 数据库连接格式
    42. SQLALCHEMY_DATABASE_URI = get_database(dbinfo)
    43. # 动态追踪修改设置,如未设置只会提示警告
    44. SQLALCHEMY_TRACK_MODIFICATIONS = False
    45. # 查询时会显示原始SQL语句
    46. SQLALCHEMY_ECHO = False
    47. # 数据库连接池的大小
    48. SQLALCHEMY_POOL_SIZE=100
    49. #指定数据库连接池的超时时间
    50. SQLALCHEMY_POOL_TIMEOUT=30
    51. # 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
    52. SQLALCHEMY_MAX_OVERFLOW=2
    53. #配置日志时,需要设置False,只能flask才能捕获移除写到日志文件中
    54. PROPAGATE_EXCEPTIONS = False
    55. #配置时区
    56. TIMEZONE = local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
    57. class DevConfig(Config):
    58. DEBUG = True
    59. class TestConfig(Config):
    60. DEBUG = True
    61. class Online(Config):
    62. DEBUG = False
    63. envs = {
    64. 'dev':DevConfig,
    65. 'test':TestConfig,
    66. 'online':Online,
    67. 'default':Config
    68. }

    3.1、ext配置

    1、ext/__init__.py

    1. from flask_sqlalchemy import SQLAlchemy
    2. from flask_cors import CORS
    3. from flask_restful import Api
    4. from flask_caching import Cache
    5. from flask_socketio import SocketIO
    6. db = SQLAlchemy() #数据库对象
    7. cors = CORS() #跨域
    8. cache = Cache() #缓存
    9. socketio = SocketIO()#websocket对象

    2、ext/config.py

    1. #跨域的配置
    2. cors_config = {
    3. "origins": "*", #所有域都允许
    4. "expose_headers": ["Content-Type", "token","x-requested-with"], #跨域请求头允许的
    5. "methods": ["GET", "POST","PUT","PATCH","OPTIONS","DELETE"], #跨域允许的请求方式
    6. "supports_credentials": True, #允许在cookies跨域
    7. }
    8. #cache缓存配置-使用redis数据库
    9. cache_config_redis = {
    10. 'CACHE_TYPE':'redis',
    11. 'CACHE_REDIS_HOST':'127.0.0.1',
    12. 'CACHE_REDIS_PORT':6637,
    13. # 'CACHE_REDIS_PASSWORD':'密码', #如果redis配置了密码
    14. 'CACHE_REDIS_DB':0, #指定使用的redis的db,默认0-15
    15. }
    16. #cache缓存配置-使用内存
    17. cache_config_mem = {
    18. 'CACHE_TYPE':'simple'#使用内存作为cache
    19. }

    3.2、apps配置

    1、__init__.py

    1. import logging
    2. import eventlet
    3. from flask import Flask,jsonify
    4. #导入跨域对象
    5. from ext import cors
    6. #导入db对象
    7. from ext import db
    8. #导入cache
    9. from ext import cache
    10. #导入socketio
    11. from ext import socketio
    12. #导入拓展的配置内容
    13. from ext.config import cache_config_redis,cache_config_mem,cors_config
    14. #导入中间件
    15. from base.middleware import after_request #响应前执行的中间件
    16. #导入配置字典
    17. from base.settings import envs
    18. #导入日志
    19. from base.logger import getLogHandlerFile
    20. from base.logger import getLogHanderTime
    21. #定时任务
    22. from base.scheduler import SchedulerManage
    23. #注册socketio的命名空间
    24. from apps.websocket.consumers import TotalWebsocketNamespace
    25. #导入蓝图
    26. from apps.user.urls import user_bp
    27. def create_app():
    28. #创建一个flask实例,传递__name__ ,是把当前文路径作为flask实例的根路径
    29. #static和templates都是创建在该路径下的
    30. app = Flask(__name__,static_folder='../static',template_folder='../templates') #static目录位置是上层的static
    31. eventlet.monkey_patch() # 开启补丁机制
    32. '基本配置'
    33. #导入配置从类中
    34. app.config.from_object(envs.get('default'))
    35. '日志配置'
    36. app.logger.addHandler(getLogHanderTime()) #基于时间
    37. app.logger.addHandler(getLogHandlerFile()) #基于文件大小
    38. app.logger.setLevel(logging.INFO)
    39. '中间件'
    40. #每次响应前都先设置好响应头,做好跨域
    41. app.after_request(after_request) #【1、使用中间件解决跨域】
    42. #执行请求处理前的中间件,
    43. # app.before_request(before_request)
    44. '拓展配置'
    45. #配置db对象 将db对象与app进行绑定,orm对象与app绑定
    46. db.init_app(app)
    47. #配置跨域,supports_credentials=True 允许携带cookies等信息
    48. cors.init_app(app,**cors_config) #【2、使用三方扩展解决跨域】
    49. #配置缓存
    50. cache.init_app(app=app,config=cache_config_mem)
    51. #配置socketio
    52. socketio.init_app(app,cors_allowed_origins='*')
    53. 'socketio注册命名空间的位置: 必须在这个py文件中注册'
    54. socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间
    55. '蓝图注册'
    56. app.register_blueprint(user_bp)
    57. '异常处理'
    58. @app.errorhandler(Exception)
    59. def handle_exception(e):
    60. # 将异常错误写到日志文件中
    61. app.logger.exception(str(e))
    62. # print(e, type(e))
    63. # 对异常错误的响应,使用api的格式
    64. return jsonify(code=500, message=str(e)), 500
    65. '定时任务'
    66. # SchedulerManage()
    67. return app

    在本文中,最重要的就是将socketio命名空间的注册,放到create_app函数中了。

    2、websocket/consumers.py

    1. from flask import render_template, request, jsonify
    2. from flask_socketio import SocketIO, send, emit, join_room
    3. from ext import socketio
    4. from flask_socketio import Namespace
    5. '''
    6. 一、非群聊功能,前端需要实时更新某些数据使用
    7. 1、返回html页面
    8. 2、主动发送websocket到后端,后端返回数据给请求的用户
    9. 3、调用某个视图函数,在视图函数中,给所有连接推送新的数据
    10. '''
    11. class TotalWebsocketNamespace(Namespace):
    12. def on_handle_data(self,data):
    13. print(data, '接收浏览器发送的数据')
    14. # 1、给发送给后端的websocket,发送数据,单独给这个websocket发送
    15. # socketio.emit('handle_data', {'data':'返回的数据','type':'user','msg':'单独返回'})
    16. emit('handle_data', {'data': '返回的数据', 'type': 'user', 'msg': '主动请求时,返回的数据'})
    17. def on_connect(self):
    18. print('connect连接')
    19. token = request.args.get('token')
    20. sid = request.sid
    21. print(request.args, 'args')
    22. # print('连接的sid',request.sid)
    23. if token == '123456':
    24. socketio.emit('success', '验证token成功')
    25. join_room('default') # 加入到默认的房间中了
    26. # 表明连接成功
    27. else:
    28. print('token验证失败')
    29. # 阻止连接
    30. return False

    四、总结

    如何将socketio处理函数集中管理,将所有的处理类集中到一个py文件中。

    1、创建一个包,在包中创建一个py文件,将所有处理socketio的类都写在这里

    2、在创建flask的app应用中,注册socketio的命名空间:在create_app函数中,注册命名空间

    1. 'socketio注册命名空间的位置: 必须在这个py文件中注册'
    2. socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间

    建议采用这种方式来管理socketio,在后续的维护会比较容易,拓展起来也简单。不建议使用函数的方式,这样就必须将函数都写在app.py文件中,不然后端无法监听到前端发送的websocket的请求。

    五、错误写法

    1、直接在consumers.py同级目录下创建一个routings.py,将命名空间的注册写到该py文件中,这样后端是无法接收到前端的websocket请求的。

    2、使用函数的方式来处理逻辑,且把函数都集中放到了websocket/consumers.py文件中,这样后端是无法接收到前端的websocket请求的。

  • 相关阅读:
    C Primer Plus(6) 中文版 第7章 C控制语句:分支和跳转 7.5 条件运算符 ?:
    OS-process
    08.SCA-CNN
    C语言编程小知识分享,希望对你有用
    PUPANVR-LVGL UI主菜单及设置窗体框架(9)
    springboot基于微信小程序的电器商城系统的设计与实现毕业设计源码251453
    契约锁乔迁新址,欢迎新老朋友来坐坐
    查询方法需要使用事务吗
    数据链路层-------以太网协议
    多线程知识点总结之温故而知新
  • 原文地址:https://blog.csdn.net/weixin_46371752/article/details/133702494