• flask-socketio实现websocket通信


    flask-socketio能够实现客户端和服务器之间的低延迟双向通信。客户端可以使用js/c++/等任何官方支持的客户端库建立与服务器的长连接。

    安装

    pip install flask-socketio 
    
    • 1

    要求

    flask-socketio与python3.6+兼容,依赖的异步服务可以从以下三种选择其一。

    • eventlet 是性能最佳的选项,支持长轮询和websocket传输。
    • gevent 性能略低于eventlet,且没有原生websocket支持,需要安装gevent-websocket或使用带websocket功能的uWSGI的web服务器。
    • 基于Werkzeug的Flask开发服务器。此种仅用于简单开发测试。

    [socketio官方文档](Introduction — Flask-SocketIO documentation)

    初始化

    from flask import Flask
    from flask_socketio import SocketIO
    
    app = Flask(__name__)
    app.config['SECRET_KEY'] = 'secret!'
    socketio = SocketIO(app)
    
    if __name__ == '__main__':socketio.run(app, host='0.0.0.0', debug=True) 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    以上用socketio.run代替了app.run

    注册连接/断开/消息事件

    @socketio.on("connect", namespace="/transaction_cycle_life")
    def connect():app.logger.info('connect------')
    
    @socketio.on("disconnect", namespace="/transaction_cycle_life")
    def disconnect():app.logger.info("disconnect -------------------")@socketio.on("message", namespace="/transaction_cycle_life")
    def message(data):app.logger.info(data) 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    以上配合客户端就可以实现双向的通信了,可以用emit发送消息到客户端

    实现一个简单的循环查询并发送给客户端

    from app import socketio, emit, app
    from flask import request
    
    
    tasks = {}
    
    class MonitorTask(object):def __init__(self, username, sid):self._running = Trueself.username = usernameself.sid = siddef terminate(self):self._running = Falsedef get_data(self):return {"msg": f"hahaha------{self.username}"}def run(self):while self._running:data = self.get_data()app.logger.info(data)# 非上下文,使用sid回传给原来的客户端socketio.emit("response", {"msg": data}, namespace='/transaction_cycle_life', room=self.sid)socketio.sleep(3)
    
    @socketio.on("join", namespace="/transaction_cycle_life")
    def join(data):username = data.get('username')# 有上下文,emit只会发给触发事件的客户端emit("response", {"msg": username})task = MonitorTask(username, request.sid)thread = socketio.start_background_task(task.run)tasks[request.sid] = {'task': task, 'thread': thread}app.logger.info(tasks)
    
    @socketio.on("leave", namespace="/transaction_cycle_life")
    def leave(*args):tasks[request.sid]['task'].terminate()app.logger("trigger leave ----------")del tasks[request.sid]emit("disconnect")
    
    @socketio.on("message", namespace="/transaction_cycle_life")
    def message(data):app.logger.info(data)
    
    @socketio.on("connect", namespace="/transaction_cycle_life")
    def connect():app.logger.info('connect------')
    
    @socketio.on("disconnect", namespace="/transaction_cycle_life")
    def disconnect():try:tasks[request.sid]['task'].terminate()del tasks[request.sid]app.logger.info("disconnect -------------------")except KeyError as ke:app.logger.info("key error pass") 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    上面的代码实现了一个客户端连接触发一次事件,并起个线程去定时持续的发送消息。

  • 相关阅读:
    什么牌子的蓝牙耳机音质好?2022好音质蓝牙耳机推荐
    Linux 下静态库与动态库的制作与使用
    基于FPGA的交通信号灯设计
    易观千帆 | 2023年4月证券APP月活跃用户规模盘点
    Base64
    【网络安全培训】无线局域网的安全威胁都有哪些?
    Docker-07:Docker网络管理
    Vue的性能优化:如何利用Vue的各种特性和技术,例如懒加载、代码分割、预渲染等,来优化Vue应用的性能
    Python手写基因编程
    【学习笔记16】JavaScript函数封装习题
  • 原文地址:https://blog.csdn.net/web2022050901/article/details/126120950