- APScheduler==3.10.4
- eventlet==0.33.3
- Flask==2.1.3
- Flask-Caching==1.10.1
- Flask-Cors==3.0.10
- Flask-Migrate==2.7.0
- Flask-RESTful==0.3.9
- Flask-SocketIO==5.1.1
- Flask-SQLAlchemy==2.5.1
- PyJWT==2.3.0
- PyMySQL==1.0.2
- redis==3.5.3
- SQLAlchemy==1.4.0 #额外修改
- Werkzeug==2.0.2 #额外修改
注意:在flask2.x版本依赖,不再支持flask_script了
flask2.x版本会自动注册 flask run 和flask db 两个命令行命令
1、启动项目
flask run --host 0.0.0.0 --port 9000
2、数据库迁移命令
flask db init
flask db migrate
flask db upgrade
apps
__init__.py : 创建app应用,各种注册
websocket
consumers.py #将所有socketio处理类放到这里
base
settings.py
ext
__init__.py: 拓展对象都放在这里
config.py : 拓展对象的配置内容
app.py
settings.py
- import os
- import datetime
- def get_database(dic):
- '"mysql+pymysql://root:Huawei@123@localhost:3306/study_flask?charset=utf8"'
- engine = dic.get('ENGINE')
- driver = dic.get('DRIVER')
- user = dic.get('USER')
- host = dic.get('HOST')
- password = dic.get("PASSWORD")
- port = dic.get('PORT')
- name = dic.get('NAME')
- dbinfo = f"{engine}+{driver}://{user}:{password}@{host}:{port}/{name}?charset=utf8"
- return dbinfo
-
- #全局通用配置类
- class Config:
- """项目配置核心类"""
- DEBUG=True
- LOG_LEVEL = "INFO"
- SECRET_KEY= '8hdj^sasdas6736475#$#5&GHG'
- BASE_PATH = os.path.dirname(os.path.abspath(__file__))
- STATIC_PATH = os.path.join(BASE_PATH, 'static') #static/ 路由对应的目录
- TEMPLATES_PATH = os.path.join(BASE_PATH, 'templates') # 用于专门检索静态文件的位置,方便修改
- # 中文乱码
- JSON_AS_ASCII = False
-
- # # 配置redis
- # # 项目上线以后,这个地址就会被替换成真实IP地址,mysql也是
- # REDIS_HOST = 'your host'
- # REDIS_PORT = your port
- # REDIS_PASSWORD = 'your password'
- # REDIS_POLL = 10
- #数据库配置
- dbinfo={
- 'ENGINE':'mysql',
- 'DRIVER':'pymysql',
- 'USER':'root',
- 'PASSWORD':'123456',
- 'HOST':"127.0.0.1",
- "PORT":"3306",
- 'NAME':'flask_obj'
- }
- # 数据库连接格式
- SQLALCHEMY_DATABASE_URI = get_database(dbinfo)
- # 动态追踪修改设置,如未设置只会提示警告
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- # 查询时会显示原始SQL语句
- SQLALCHEMY_ECHO = False
- # 数据库连接池的大小
- SQLALCHEMY_POOL_SIZE=100
- #指定数据库连接池的超时时间
- SQLALCHEMY_POOL_TIMEOUT=30
- # 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
- SQLALCHEMY_MAX_OVERFLOW=2
-
- #配置日志时,需要设置False,只能flask才能捕获移除写到日志文件中
- PROPAGATE_EXCEPTIONS = False
-
- #配置时区
- TIMEZONE = local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
-
-
- class DevConfig(Config):
- DEBUG = True
-
- class TestConfig(Config):
- DEBUG = True
-
- class Online(Config):
- DEBUG = False
-
- envs = {
- 'dev':DevConfig,
- 'test':TestConfig,
- 'online':Online,
- 'default':Config
- }
1、ext/__init__.py
- from flask_sqlalchemy import SQLAlchemy
- from flask_cors import CORS
- from flask_restful import Api
- from flask_caching import Cache
- from flask_socketio import SocketIO
-
- db = SQLAlchemy() #数据库对象
- cors = CORS() #跨域
- cache = Cache() #缓存
- socketio = SocketIO()#websocket对象
2、ext/config.py
- #跨域的配置
- cors_config = {
- "origins": "*", #所有域都允许
- "expose_headers": ["Content-Type", "token","x-requested-with"], #跨域请求头允许的
- "methods": ["GET", "POST","PUT","PATCH","OPTIONS","DELETE"], #跨域允许的请求方式
- "supports_credentials": True, #允许在cookies跨域
- }
-
-
- #cache缓存配置-使用redis数据库
- cache_config_redis = {
- 'CACHE_TYPE':'redis',
- 'CACHE_REDIS_HOST':'127.0.0.1',
- 'CACHE_REDIS_PORT':6637,
- # 'CACHE_REDIS_PASSWORD':'密码', #如果redis配置了密码
- 'CACHE_REDIS_DB':0, #指定使用的redis的db,默认0-15
- }
-
- #cache缓存配置-使用内存
- cache_config_mem = {
- 'CACHE_TYPE':'simple'#使用内存作为cache
- }
1、__init__.py
- import logging
- import eventlet
- from flask import Flask,jsonify
-
- #导入跨域对象
- from ext import cors
- #导入db对象
- from ext import db
- #导入cache
- from ext import cache
- #导入socketio
- from ext import socketio
- #导入拓展的配置内容
- from ext.config import cache_config_redis,cache_config_mem,cors_config
-
- #导入中间件
- from base.middleware import after_request #响应前执行的中间件
-
- #导入配置字典
- from base.settings import envs
-
- #导入日志
- from base.logger import getLogHandlerFile
- from base.logger import getLogHanderTime
-
- #定时任务
- from base.scheduler import SchedulerManage
-
- #注册socketio的命名空间
- from apps.websocket.consumers import TotalWebsocketNamespace
-
- #导入蓝图
- from apps.user.urls import user_bp
-
-
- def create_app():
- #创建一个flask实例,传递__name__ ,是把当前文路径作为flask实例的根路径
- #static和templates都是创建在该路径下的
- app = Flask(__name__,static_folder='../static',template_folder='../templates') #static目录位置是上层的static
- eventlet.monkey_patch() # 开启补丁机制
-
- '基本配置'
- #导入配置从类中
- app.config.from_object(envs.get('default'))
-
- '日志配置'
- app.logger.addHandler(getLogHanderTime()) #基于时间
- app.logger.addHandler(getLogHandlerFile()) #基于文件大小
- app.logger.setLevel(logging.INFO)
-
- '中间件'
- #每次响应前都先设置好响应头,做好跨域
- app.after_request(after_request) #【1、使用中间件解决跨域】
- #执行请求处理前的中间件,
- # app.before_request(before_request)
-
- '拓展配置'
- #配置db对象 将db对象与app进行绑定,orm对象与app绑定
- db.init_app(app)
- #配置跨域,supports_credentials=True 允许携带cookies等信息
- cors.init_app(app,**cors_config) #【2、使用三方扩展解决跨域】
- #配置缓存
- cache.init_app(app=app,config=cache_config_mem)
- #配置socketio
- socketio.init_app(app,cors_allowed_origins='*')
-
- 'socketio注册命名空间的位置: 必须在这个py文件中注册'
- socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间
-
- '蓝图注册'
- app.register_blueprint(user_bp)
-
- '异常处理'
- @app.errorhandler(Exception)
- def handle_exception(e):
- # 将异常错误写到日志文件中
- app.logger.exception(str(e))
- # print(e, type(e))
- # 对异常错误的响应,使用api的格式
- return jsonify(code=500, message=str(e)), 500
- '定时任务'
- # SchedulerManage()
-
- return app
在本文中,最重要的就是将socketio命名空间的注册,放到create_app函数中了。
2、websocket/consumers.py
- from flask import render_template, request, jsonify
- from flask_socketio import SocketIO, send, emit, join_room
- from ext import socketio
- from flask_socketio import Namespace
- '''
- 一、非群聊功能,前端需要实时更新某些数据使用
- 1、返回html页面
- 2、主动发送websocket到后端,后端返回数据给请求的用户
- 3、调用某个视图函数,在视图函数中,给所有连接推送新的数据
- '''
-
- class TotalWebsocketNamespace(Namespace):
- def on_handle_data(self,data):
- print(data, '接收浏览器发送的数据')
- # 1、给发送给后端的websocket,发送数据,单独给这个websocket发送
- # socketio.emit('handle_data', {'data':'返回的数据','type':'user','msg':'单独返回'})
- emit('handle_data', {'data': '返回的数据', 'type': 'user', 'msg': '主动请求时,返回的数据'})
-
- def on_connect(self):
- print('connect连接')
- token = request.args.get('token')
- sid = request.sid
- print(request.args, 'args')
- # print('连接的sid',request.sid)
-
- if token == '123456':
- socketio.emit('success', '验证token成功')
- join_room('default') # 加入到默认的房间中了
- # 表明连接成功
- else:
- print('token验证失败')
- # 阻止连接
- return False
如何将socketio处理函数集中管理,将所有的处理类集中到一个py文件中。
1、创建一个包,在包中创建一个py文件,将所有处理socketio的类都写在这里
2、在创建flask的app应用中,注册socketio的命名空间:在create_app函数中,注册命名空间
- 'socketio注册命名空间的位置: 必须在这个py文件中注册'
- socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间
建议采用这种方式来管理socketio,在后续的维护会比较容易,拓展起来也简单。不建议使用函数的方式,这样就必须将函数都写在app.py文件中,不然后端无法监听到前端发送的websocket的请求。
1、直接在consumers.py同级目录下创建一个routings.py,将命名空间的注册写到该py文件中,这样后端是无法接收到前端的websocket请求的。
2、使用函数的方式来处理逻辑,且把函数都集中放到了websocket/consumers.py文件中,这样后端是无法接收到前端的websocket请求的。