• fastapi实现websocket


    fastapi实现websocket

    websockets.py

    主要实现了部分房间进行聊天的功能以及在房间进行某种举动之后的操作
    注:不是很完善后续将进行补充

    import json, os
    from select import select
    from typing import Dict
    
    from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
    from sqlalchemy.orm import Session
    from starlette.requests import Request
    from http import HTTPStatus
    from fastapi.responses import HTMLResponse
    from models.users import Room
    
    from settings.settings import get_db
    from crud.users import get_users, get_user, get_mobile_user, create_user, delete_user
    from schemas import users as schemas_user
    # from schemas.users import * 
    from utils.jwt import *
    from utils.redis_utils import redis_get_msg, redis_set_msg, redis
    from config.databases import REDIS_DATABASES_URL
    
    ## html在页面上做显示的用途
    html = """
        
        
        
            
            websocket
        
        
        

    User1 Chat

    """
    ## 注册子路由 router = APIRouter() ## 定义聊天类 class ConnectionManager: ## 初始化定义记录全部的链接 def __init__(self): self.active_connections: List[WebSocket] = [] self.sessions = dict() ## async def connect(self, ws: WebSocket): await ws.accept() self.active_connections.append(ws) def disconnect(self, ws: WebSocket): self.active_connections.remove(ws) ## 关闭链接时的动作 async def on_close(self, websocket: WebSocket, close_code: int): await websocket.close(close_code) self.active_connections.remove(websocket) @staticmethod async def send_personal_message(message: str, ws: WebSocket): await ws.send_text(message) ## 点对点的操作 @staticmethod async def send_user_message(message: str, ws: WebSocket): await ws.send_text(message) ## 广播信息的操作 async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) ## 解析关键词 async def filter_message(self, message:str): json_object = None try: json_object = json.loads(message) # if 'event' not in json_object: # json_object = None except ValueError as e: print('yichang') print('json_object',json_object) return json_object # {"event":"join"} ## 收到对应的消息之后解析消息并确定需要干什么事情 async def on_message(self, message: str, ws: WebSocket): print('on_message', message) msg = await self.filter_message(message) # print('msg', msg['event']) if msg: # 接收pong ping if msg['event'] == 'pong': await self.send_personal_message('ping', ws) elif msg['event'] == 'ping': await self.send_personal_message('pong', ws) # 进入房间 elif msg['event'] == 'join': # await self.on_join(msg) await self.send_personal_message('success', ws) # 退出房间的情况 会触发异常 elif msg['event'] == 'close': await self.send_personal_message('再见', ws) await self.on_close(ws, 1008) else: await self.send_personal_message('wrong message format') ## 获取房间的一些信息 async def get_room_info(self, room_id, db): room = db.query(Room).filter(Room.id==room_id).first() return room ## 使用逻辑将新进的用户加到对应的房间中 async def register_user(self, room_id, user_id, ws: WebSocket): await self.set_userid(user_id, ws) await self.set_roomid(room_id, ws) # 验证是否存储进去 roomid = await self.get_roomid(ws) print('-------------------------', roomid) # register session if self.sessions.get(room_id, None) is None: self.sessions[room_id] = [] self.sessions[room_id].append(ws) print('self.sessions', self.sessions) async def test(self): async with redis.client() as conn: val = await conn.get('session2userid:{}'.format(self)) room_id = await conn.get('session2roomid:{}'.format(self)) return val, room_id ## 将用户移出房间 async def deregister_user(self, ws: WebSocket): # del session -> user_id user_id = await self.get_userid(ws) await self.del_userid() # del session -> race_id room_id = await self.get_roomid(ws) await self.del_roomid() room_id = int(room_id) # 登陆过的才有user_id if user_id: # deregister session self.sessions[room_id].remove(ws) # 从业层面将用户移除 # await self.leave_room(room_id, user_id) # 查看是否删除 print('session', self.sessions) # user离开room的时候 async def leave_room(self, room_id, user_id): pass # 将信息发送给房间中的人 async def send_msg_room(self, msg, ws1: WebSocket): print('session', self.sessions) # TODO 不能使用函数取需要自定义取参数 # user_id = self.get_userid(ws1) # room_id = self.get_roomid(ws1) async with redis.client() as conn: user_id = await conn.get('session2userid:{}'.format(ws1)) room_id = await conn.get('session2roomid:{}'.format(ws1)) print('----------user=------------', user_id) print('----------room=------------', room_id) # get room_id # room_id = await self.get_roomid() room_id = int(room_id) for ws in self.sessions[room_id]: if ws != ws1: # 进来时取ws的姓名和房间号 async with redis.client() as conn: user_id = await conn.get('session2userid:{}'.format(ws1)) room_id = await conn.get('session2roomid:{}'.format(ws1)) print('send msg user[{}] room_id [{}]'.format(user_id, room_id)) await ws.send_text(msg) # 存入sessin到redis async def set_sessions(self, ws: WebSocket): async with redis.client() as conn: await conn.hset('sessions', json.dumps([ws])) # 将userid存储起来 async def set_userid(self, user_id, ws: WebSocket): async with redis.client() as conn: await conn.set('session2userid:{}'.format(ws), user_id) val = await conn.get('session2userid:{}'.format(ws)) return val # 将roomid 存储起来 async def set_roomid(self, room_id, ws: WebSocket): async with redis.client() as conn: await conn.set('session2roomid:{}'.format(ws), room_id) val = await conn.get('session2roomid:{}'.format(ws)) return val # 获取用户id async def get_userid(self, ws: WebSocket): print('-------------ws-------------', ws) async with redis.client() as conn: user_id = await conn.get('session2userid:{}'.format(ws)) return user_id # 获取房间id async def get_roomid(self, ws: WebSocket): async with redis.client() as conn: room_id = await conn.get('session2roomid:{}'.format(ws)) return room_id # 删除用户 async def del_userid(self): async with redis.client() as conn: await conn.delete('session2userid:{}'.format(self)) # 删除房间 async def del_roomid(self): async with redis.client() as conn: await conn.delete('session2roomid:{}'.format(self)) manager = ConnectionManager() @router.get("/ws/") async def index(): return HTMLResponse(html) @router.websocket("/ws/{user}") async def websocket_endpoint(websocket: WebSocket, user: str): await manager.connect(websocket) await manager.broadcast(f"用户{user}进入聊天室") try: while True: data = await websocket.receive_text() await manager.send_personal_message(f"你说了: {data}", websocket) await manager.broadcast(f"用户:{user} 说: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"用户-{user}-离开") @router.websocket("/ws/{room_id}/{user_id}") async def join_room(websocket: WebSocket, room_id: int, user_id: str, db: Session = Depends(get_db)): # 判定当前进入房间的user 是不是已经在链接中存在了 await manager.connect(websocket) await manager.register_user(room_id, user_id, websocket) await manager.send_msg_room(f"用户-{user_id}-进入", websocket) try: while True: data = await websocket.receive_text() await manager.send_personal_message(f"你说了: {data}", websocket) # 解析用户是发送的消息是否是正常的 # await manager.on_message(data, websocket) # 将信息发送给房间中的人 await manager.send_msg_room(f"{user_id}说了: {data}", websocket) except WebSocketDisconnect: manager.disconnect(websocket) await manager.deregister_user(websocket) await manager.send_msg_room(f"用户-{user_id}-离开", websocket) @router.websocket("/test/") async def websocket_test(websocket: WebSocket): # todo 进来时查看是不是携带token,如果没有携带直接断开 # 将用户加入到websocket链接中 await manager.connect(websocket) # 第一个人进入存储1 依次类推 # await redis_set_msg(websocket, '1234') try: while True: data = await websocket.receive_text() await manager.send_personal_message(f"你说了: {data}", websocket) # 解析用户是发送的消息是否是正常的 await manager.on_message(data, websocket) except WebSocketDisconnect: manager.disconnect(websocket) await manager.deregister_user(websocket) await manager.broadcast(f"用户-test-离开")
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
  • 相关阅读:
    关于Facebook营销的十个常见问题,一次性讲清楚!
    ECO概念及理解
    深度学习(18):nerf、nerf-pytorch代码运行与学习
    js---封装浅拷贝和深拷贝
    Spark SQL将Hive表中的数据写入到MySQL数据库中
    【C语言】指针那些事之数组传参和指针传参的区别
    java毕业生设计医院取药系统计算机源码+系统+mysql+调试部署+lw
    迅为i.MX8Mmini开发板离线构建Yocto系统
    阿里也出手了!Spring CloudAlibaba AI问世了
    vue中使用wangeditor富文本编辑器
  • 原文地址:https://blog.csdn.net/m0_47202787/article/details/126528139