• vue3+ts+java使用WebSocket传输数据


    一、环境

    系统:win11

    IDE:vscode

    框架:electron22.0.0+vite2+vue3+typescript4.8.4+springboot2.2.5+jdk1.8

    二、websocket介绍

    2.1 由来

            WebSocket未出现之前,浏览器和服务器之间的通信是通过Web的poll技术进行通信,就是浏览器不停的对服务器主动发动请求,每发起一次新的HTTP请求,就是开启一次新的TCP链接,HTTP协议本身就是一种开销非常大的协议,所以这种方式效率低下。于是就出现了WebSocket协议。

            下面是采用poll方式的代码示例:

    1. setInterval(() => {
    2. // 查询注册机列表
    3. getRegisterInfo().then(res => {
    4. isHost.value = store.state.onHost;
    5. }).catch(err => {
    6. console.log('getComputerList err:', err);
    7. });
    8. }, 1000);

    为了页面及时更新,会像服务器产生大量的请求,造成资源浪费。

    2.2 WebSocket通信过程

            WebSocket是一种完全不同于HTTP的协议,但是它需要通过HTTP协议的GET请求,将HTTP协议升级为WebSocket协议。升级的过程被称为握手(handshake)。当浏览器和服务器握手成功后,则可以开始根据WebSocket定义的通信帧格式开始通信了。WebSocket协议的通信帧也分为控制数据帧和普通数据帧,前者用于控制WebSocket链接状态,后者用于承载数据。

            握手过程就是将HTTP协议升级为WebSocket协议的过程。在HTTP的GET请求头部添加信息如下:

    Upgrade: websocket      #规定必需的字段,其值必需为 websocket , 如果不是则握手失败;
    Connection: Upgrade  #规定必需的字段,值必需为 Upgrade , 如果不是则握手失败;
    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==    #必需字段,一个随机的字符串;
    Sec-WebSocket-Protocol: chat, superchat   #可选字段,可以用于标识应用层的协议;
    Sec-WebSocket-Version: 13  #必需字段,代表了 WebSocket 协议版本,值必需是 13 , 否则
    握手失败;

    当服务器端,成功验证了以上信息后,则会返回一个形如以下信息的响应:

    HTTP/1.1 101 Switching Protocols   #101代表握手成功的状态码
    Upgrade: websocket  #规定必需的字段,其值必需为 websocket , 如果不是则握手失败;
    Connection: Upgrade #规定必需的字段,值必需为 Upgrade , 如果不是则握手失败;
    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=  #规定必需的字段,该字段的值是通过固定字符串 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 加上请求中 Sec-WebSocket-Key 字段的值,然后再对其结果通过 SHA1 哈希算法求出的结果。
    Sec-WebSocket-Protocol: chat  #对应于请求中的 Sec-WebSocket-Protocol 字段;

    2.3 WebSocket 优缺点

            优点:

            1、使用资源少。创建连接后,数据叫唤的包头较少;

            2、能实现及时通信。长连接,实时通信;

            3、更好的二进制支持。能更好的处理二进制内容;

            4、支持拓展。用户可以拓展协议,实现部分自定义的子协议。

            缺点:

            1、使用WebSocket,长连接,会占用一定资源;

            2、浏览器品类多,支持程度不同,可能问题多;

            3、与poll相比,代码复杂度将上升,完全依赖于websocket,要多写逻辑对websocket状态进行监控,对开发者要求也会高一些。

           没有完美的事物,我们讨论优缺点的目的是它适合什么场景,在要求实时性较高的应用时,那么WebSocket会更适合。如果基本都是操作的应用,实时性要求很低,那么WebSocket使用的资源成本就是不合适的。

    2.4 浏览器支持

          WebSocket - Web API 接口参考 | MDNWebSocket 对象提供了用于创建和管理 WebSocket 连接,以及可以通过该连接发送和接收数据的 API。icon-default.png?t=N7T8https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket以上是供参考的API接口,本文不做赘述,自己进入使用。

    三、前端使用示例

    文件名称

    具体代码

    1. /*
    2. * @Descripttion: 封装socket方法
    3. * @version:
    4. * @Date: 2021-08-06 11:14:39
    5. * @LastEditTime: 2021-10-26 14:06:34
    6. */
    7. import { store } from "../store";
    8. import { ElMessage } from "element-plus";
    9. import { Base64 } from "js-base64";
    10. import { updateComputerIsValid } from "/@/service/AppService";
    11. import { localIp, playDutySound } from "../CommonUtil";
    12. import { deal3004Procotol, deal3005Procotol, deal3006Procotol } from "/@/service/WebsocketService"; //业务代码
    13. interface socket {
    14. websocket: any;
    15. connectURL: string;
    16. socket_open: boolean;
    17. hearbeat_timer: any;
    18. hearbeat_interval: number;
    19. is_reonnect: boolean;
    20. reconnect_count: number;
    21. reconnect_current: number;
    22. ronnect_number: number;
    23. reconnect_timer: any;
    24. reconnect_interval: number;
    25. // init: (receiveMessage: Function | null) => any;
    26. init: () => any;
    27. receive: (message: any) => void;
    28. heartbeat: () => void;
    29. send: (data: any, callback?: any) => void;
    30. close: () => void;
    31. reconnect: () => void;
    32. webSocketBack?: (message: any) => void;
    33. }
    34. const socket: socket = {
    35. websocket: null,
    36. connectURL: import.meta.env.VITE_WEBSOCKET_MONITOR_URL + localIp().replaceAll(".", ""),
    37. // 开启标识
    38. socket_open: false,
    39. // 心跳timer
    40. hearbeat_timer: null,
    41. // 心跳发送频率
    42. hearbeat_interval: 3000,
    43. // 是否自动重连
    44. is_reonnect: true,
    45. // 重连次数
    46. reconnect_count: 5000,
    47. // 已发起重连次数
    48. reconnect_current: 1,
    49. // 网络错误提示此时
    50. ronnect_number: 0,
    51. // 重连timer
    52. reconnect_timer: null,
    53. // 重连频率 不能设置的太小,否则会出现一次重连未返回的时候,下一次又开始重连
    54. reconnect_interval: 6000,
    55. // init: (receiveMessage: Function | null) => {
    56. init: () => {
    57. if (!("WebSocket" in window)) {
    58. // if (!("WebSocket" in window)) {
    59. ElMessage.warning("浏览器不支持WebSocket");
    60. return null;
    61. }
    62. // 已经创建过连接不再重复创建
    63. if (socket.websocket) {
    64. return socket.websocket;
    65. }
    66. socket.websocket = new WebSocket(socket.connectURL);
    67. socket.websocket.onmessage = (e: any) => {
    68. // if (receiveMessage) {
    69. // receiveMessage(e);
    70. // }
    71. if (socket.webSocketBack) {
    72. socket.webSocketBack(e);
    73. }
    74. };
    75. socket.websocket.onclose = (e: any) => {
    76. console.log("websocket--关闭", socket.reconnect_current,e);
    77. if (socket.hearbeat_timer) {
    78. clearInterval(socket.hearbeat_timer);
    79. }
    80. //业务代码- 置位为1
    81. updateComputerIsValid(localIp(), 1);
    82. socket.socket_open = false;
    83. // 需要重新连接
    84. if (socket.is_reonnect) {
    85. console.log("websocket--需要重新连接", socket.is_reonnect,socket.reconnect_interval);
    86. socket.reconnect_timer = setTimeout(() => {
    87. console.log("websocket--重连", socket.reconnect_current);
    88. // 超过重连次数
    89. if (
    90. socket.reconnect_current > socket.reconnect_count &&
    91. socket.reconnect_count > -1
    92. ) {
    93. console.log("websocket--超过重连次数");
    94. clearTimeout(socket.reconnect_timer);
    95. socket.is_reonnect = false;
    96. return;
    97. }
    98. // 记录重连次数
    99. socket.reconnect_current++;
    100. //清除 socket.websocket
    101. socket.websocket = null;
    102. socket.reconnect();
    103. }, socket.reconnect_interval);
    104. }
    105. };
    106. // 连接成功
    107. socket.websocket.onopen = function () {
    108. console.log("websocket--连接成功");
    109. //业务代码
    110. updateComputerIsValid(localIp(), 0);
    111. socket.socket_open = true;
    112. socket.is_reonnect = true;
    113. // 开启心跳
    114. socket.heartbeat();
    115. };
    116. // 连接发生错误
    117. socket.websocket.onerror = function () {
    118. console.log("websocket--发生错误!关闭执行重连");
    119. socket.websocket.onclose();
    120. };
    121. },
    122. send: (data, callback = null) => {
    123. // 开启状态直接发送
    124. if (socket.websocket.readyState === socket.websocket.OPEN) {
    125. socket.websocket.send(JSON.stringify(data));
    126. if (callback) {
    127. callback();
    128. }
    129. // 正在开启状态,则等待1s后重新调用
    130. } else {
    131. clearInterval(socket.hearbeat_timer);
    132. if (socket.ronnect_number < 1) {
    133. // ElMessage({
    134. // type: 'error',
    135. // message: i18n.global.t('chat.unopen'),
    136. // duration: 0,
    137. // })
    138. console.log("服务关闭了!");
    139. }
    140. socket.ronnect_number++;
    141. }
    142. },
    143. receive: (message: any) => {
    144. let params = Base64.decode(JSON.parse(message.data).data);
    145. params = JSON.parse(params);
    146. return params;
    147. },
    148. heartbeat: () => {
    149. if (socket.hearbeat_timer) {
    150. clearInterval(socket.hearbeat_timer);
    151. }
    152. socket.hearbeat_timer = setInterval(() => {
    153. let diffMs = Number(new Date()) - Number(store.state.webSocketLastTime);
    154. console.log("websocket--上次间隔时间:", diffMs, "3秒以上才发送心跳包");
    155. if (diffMs > 0) {
    156. let data = {
    157. // languageId: store.state.users.language,
    158. content: "ping",
    159. };
    160. var sendDara = {
    161. encryption_type: "base64",
    162. data: Base64.encode(JSON.stringify(data)),
    163. };
    164. socket.send(sendDara);
    165. store.commit("setWebSocketLastTime", new Date());
    166. console.log(
    167. "websocket--心跳发送",
    168. sendDara,
    169. "更新时间:",
    170. store.state.webSocketLastTime
    171. );
    172. }
    173. }, socket.hearbeat_interval);
    174. },
    175. close: () => {
    176. clearInterval(socket.hearbeat_timer);
    177. socket.is_reonnect = false;
    178. socket.websocket.onclose();
    179. },
    180. /**
    181. * 重新连接
    182. */
    183. reconnect: () => {
    184. //websocket存在且不想重连的时候
    185. if (!socket.is_reonnect) {
    186. // if (socket.websocket && !socket.is_reonnect) {
    187. console.log("websocket--存在但是不需要重连的时候,关闭", socket.websocket, socket.is_reonnect);
    188. socket.close();
    189. }
    190. socket.init();
    191. },
    192. /**
    193. * 业务代码--数据处理
    194. * @param backMessage
    195. */
    196. webSocketBack(backMessage: any) {
    197. store.commit("setWebSocketLastTime", new Date());
    198. console.log(
    199. "websocket-接受到的信息" + JSON.stringify(backMessage),
    200. "更新的时间:",
    201. store.state.webSocketLastTime
    202. );
    203. const wsData = backMessage.data.split("|");
    204. const wsDataCode = backMessage.data.split("|")[0];
    205. // 零位是协议号
    206. switch (wsDataCode) {
    207. // 值班机获取 提醒间隔时间 后的处理
    208. case "3002": {
    209. console.log("收到ws:3002: " + JSON.stringify(wsData));
    210. const setHost = wsData[1];
    211. store.commit("setDutyConfirmTime", Number(wsData[2]));
    212. if (setHost === "0") {
    213. store.commit("setLocalComputerDutyState", 0);
    214. store.commit("setOnDutyState", 0);
    215. } else {
    216. store.commit("setLocalComputerDutyState", 1);
    217. store.commit("setOnDutyState", 1);
    218. }
    219. break;
    220. }
    221. case "3003": {
    222. console.log("收到ws:3003", wsDataCode);
    223. if (wsData[1] === "0") {
    224. playDutySound();
    225. if (store.state.onDutyState === 0) {
    226. } else if (store.state.onDutyState === 1) {、
    227. playDutySound();
    228. store.commit("setOnDutyState", true);
    229. }
    230. } else if (wsData[1] === "1") {
    231. store.commit("setOnDutyState", false);
    232. }
    233. break;
    234. }
    235. case "3004": {
    236. //更新store中的数据
    237. deal3004Procotol(wsData);
    238. break;
    239. }
    240. case "3005": {
    241. //更新store中的数据
    242. deal3005Procotol(wsData);
    243. break;
    244. }
    245. case "3006": {
    246. //更新store中的数据
    247. deal3006Procotol(wsData);
    248. break;
    249. }
    250. }
    251. },
    252. };
    253. export default socket;

    其中业务代码请不用关注,自己实现自己的业务逻辑即可。

    1001错误

    对于重连时间的设置,如果设置的时间太短,会出现反复1001错误(The WebSocket session [] timeout expired)关闭再重连的现象:

    把服务端关闭后

    每次错误返回中间有两次重连操作,所以调整了重连间隔时间,错误消失,推论:一次重连结果还未出来的时候,又发起了地址一样的连接请求,造成冲突,会关闭上次连接,这次关闭会引发上次连接的重连,这就造成了反复重连。目前我采用的是拉长重连时间,比较简单,可以尝试通过判断连接状态来阻止一次连接没完成之前再次连接。

    流程图

    启动连接

    1. //APP.VUE
    2. import socket from "/@/utils/websocket";
    3. onMounted(async () => {
    4. socket.init();
    5. });

    四、后端服务

    引入依赖包

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-websocketartifactId>
    4. dependency>

    客户端ip获取:

    在mainApplication上添加下面注解:

    @ServletComponentScan("**.**.filter")  //防止 @WebListener 无效
    1. import javax.servlet.ServletResponse;
    2. import javax.servlet.http.HttpServletRequest;
    3. import java.io.IOException;
    4. @javax.servlet.annotation.WebFilter(filterName = "sessionFilter",urlPatterns = "/*")
    5. @Order(1)
    6. public class WebFilter implements Filter {
    7. @Override
    8. public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
    9. HttpServletRequest req= (HttpServletRequest) servletRequest;
    10. req.getSession().setAttribute("ip",req.getRemoteHost());
    11. filterChain.doFilter(servletRequest,servletResponse);
    12. }
    13. }

    WebSocket配置类

    在这里也做了IP的获取

    1. @Configuration
    2. public class WebSocketConfig extends ServerEndpointConfig.Configurator {
    3. /**
    4. * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
    5. */
    6. @Bean
    7. public ServerEndpointExporter serverEndpointExporter() {
    8. return new ServerEndpointExporter();
    9. }
    10. @Override
    11. public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    12. Map attributes = sec.getUserProperties();
    13. HttpSession session = (HttpSession) request.getHttpSession();
    14. if (session != null) {
    15. attributes.put(GlobalContants.IP_ADDR, session.getAttribute("ip"));
    16. Enumeration names = session.getAttributeNames();
    17. while (names.hasMoreElements()) {
    18. String name = names.nextElement();
    19. attributes.put(name, session.getAttribute(name));
    20. }
    21. }
    22. }
    23. }

    Websocket接收

    1. import com.baomidou.mybatisplus.core.toolkit.ArrayUtils;
    2. import com.deyou.cabin.monitor.common.GlobalContants;
    3. import com.deyou.cabin.monitor.common.GlobalParams;
    4. import com.deyou.cabin.monitor.common.utils.AssembleDownProtocolUtils;
    5. import com.deyou.cabin.monitor.common.utils.CommonServeUtils;
    6. import com.deyou.cabin.monitor.config.WebSocketConfig;
    7. import com.deyou.cabin.monitor.model.WebSocketModel;
    8. import lombok.extern.slf4j.Slf4j;
    9. import org.apache.commons.lang3.StringUtils;
    10. import org.springframework.stereotype.Component;
    11. import javax.websocket.OnClose;
    12. import javax.websocket.OnError;
    13. import javax.websocket.OnMessage;
    14. import javax.websocket.OnOpen;
    15. import javax.websocket.Session;
    16. import javax.websocket.server.PathParam;
    17. import javax.websocket.server.ServerEndpoint;
    18. import java.time.LocalDateTime;
    19. import java.util.ArrayList;
    20. import java.util.List;
    21. import java.util.Map;
    22. @Slf4j
    23. @Component
    24. //@RequiredArgsConstructor
    25. @ServerEndpoint(value = "/websocket/monitor/{code}",configurator = WebSocketConfig.class)
    26. public class WebsocketController {
    27. /**
    28. * 连接建立成功调用的方法
    29. */
    30. @OnOpen
    31. public void onOpen(Session session,@PathParam(value = "code") String code) {
    32. try
    33. {
    34. session.setMaxIdleTimeout(30000);
    35. }
    36. catch (Exception e)
    37. {
    38. log.error(e.getMessage(),e);
    39. }
    40. }
    41. /**
    42. * 连接关闭调用的方法
    43. */
    44. @OnClose
    45. public void onClose(Session session) {
    46. try
    47. {
    48. log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(),
    49. }
    50. catch (Exception e)
    51. {
    52. log.error(e.getMessage(),e);
    53. }
    54. }
    55. /**
    56. * 收到客户端消息后调用的方法
    57. *
    58. * @param message
    59. *
    60. */
    61. @OnMessage
    62. public void onMessage(String message, Session session) {
    63. try
    64. {
    65. log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
    66. }
    67. catch (Exception ex)
    68. {
    69. log.error(ex.getMessage(),ex);
    70. }
    71. }
    72. @OnError
    73. public void onError(Session session, Throwable error) {
    74. log.error("websocket发生错误:" + session.getId() + "---" + error.getMessage(),error);
    75. }
    76. public void sendMessageToAll(String message, Session fromSession) {
    77. try
    78. {
    79. //GlobalParams.webSocketModelMap是全局变量 ConcurrentHashMap webSocketModelMap
    80. for (Map.Entry sessionEntry : GlobalParams.webSocketModelMap.entrySet()) {
    81. Session toSession = sessionEntry.getValue().getSession();
    82. // 排除掉自己
    83. if (!fromSession.getId().equals(toSession.getId())) {
    84. log.info("服务端给客户端[{}][{}]发送消息{}", toSession.getId(),
    85. sessionEntry.getValue().getWebSocketCode(), message);
    86. sendMessToOne(message,toSession);
    87. }
    88. }
    89. }
    90. catch (Exception e)
    91. {
    92. log.error(e.getMessage(),e);
    93. }
    94. }
    95. public void sendMessageToAll(String message) {
    96. try {
    97. //GlobalParams.webSocketModelMap是全局变量 ConcurrentHashMap webSocketModelMap
    98. for (Map.Entry sessionEntry : GlobalParams.webSocketModelMap.entrySet()) {
    99. Session toSession = sessionEntry.getValue().getSession();
    100. log.info("服务端给客户端[{}][{}]发送消息{}", toSession.getId(), sessionEntry.getValue().getWebSocketCode(), message);
    101. sendMessToOne(message, toSession);
    102. }
    103. } catch (Exception e) {
    104. log.error(e.getMessage(), e);
    105. }
    106. }
    107. public void sendMessToOne(String message, Session toSession) {
    108. try {
    109. // 尝试过锁住方法,还是不行,这里锁住webSocketMap,让多线程,访问不同对象,也能同步
    110. synchronized(GlobalParams.webSocketModelMap){
    111. String toId = toSession.getId();
    112. if (StringUtils.isNotBlank(toId) && GlobalParams.webSocketModelMap.containsKey(toId)) {
    113. GlobalParams.webSocketModelMap.get(toId).getSession().getBasicRemote().sendText(message);
    114. }
    115. }
    116. } catch (Exception e) {
    117. log.error("服务端发送消息给客户端失败,"+e.getMessage(),e);
    118. }
    119. }
    120. }

    其中,synchronized(GlobalParams.webSocketModelMap)中GlobalParams.webSocketModelMap是我记录当前在线的websocket的信息。上边代码的注释中已经写了,这个锁的目的是为了解决websocket服务端下发时出现的错误“The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method”的错误,问题的引发场景和分析个人记录如下: 

    1.因为在 @OnMessage中,我有两个方法同时使用了session,导致session多线程不安全,发生的频次少都可能不出现这个问题!

    2.JSON.toJSONString(GlobalParams.webSocketModelMap) 其中带有session,会引发这个问题 解决办法:加异步锁,但是需要锁定 ConcurrentHashMap

    使用

    1. @Resource
    2. private WebsocketController websocketService;
    1. try{
    2. websocketService.sendMessToOne(sendMes, toSession);
    3. }catch (Exception e){
    4. log.error(e.getMessage(),e);
    5. }

    5、结束

    连接地址:ws://IP:PORT/websocket/monitor/{code} ,其中code是你自己定义的值。


  • 相关阅读:
    Private market:借助ZK实现的任意计算的trustless交易
    22. containerd使用Devmapper snapshotter讲解
    卡尔曼及扩展卡尔曼滤波详细推导-来自DR_CAN视频
    antv/x6 键盘快捷键事件
    残差resnet复现,源代码理解
    为什么用Selenium做自动化测试,你真的知道吗?
    有趣的前端面试题
    CURL简单使用
    docker启动镜像命令
    unity 点击3D物体
  • 原文地址:https://blog.csdn.net/hchyboy/article/details/132927047