• spring-websocket实现(一)


    websocket介绍

            WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。

            WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

    实现

            在spring中,使用最基础的spring-websocket的方式有两种。一种是利用注解的方式,一种是利用直接通过实现WebSocketConfigurer配置来实现。

    本次先实现第一种实现WebSocketConfigurer配置来实现。

    后端实现

    先引入依赖:

    
        org.springframework.boot
        spring-boot-starter-websocket
    

     处理器

            首先,需要实现WebSocketHandler接口,WebSocketHandler定义了处理连接、处理数据、关闭连接、以及连接异常锁调用的方法,在实现类中我们可以定义需要的操作。

    1. public class WebSocketEndPointHandler implements WebSocketHandler {
    2. private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketEndPointHandler.class);
    3. private static WebSessionSendService webSessionSendService = new WebSessionSendService();
    4. /**
    5. * 连接处理
    6. */
    7. @Override
    8. public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    9. LOGGER.info("WebSocketEndPointHandler:afterConnectionEstablished");
    10. List appIds = session.getHandshakeHeaders().get("Sec-WebSocket-Protocol");
    11. webSessionSendService.addAppSession(appIds.get(0), session);
    12. }
    13. /**
    14. * 处理客户端发送过来的数据
    15. * @param message 消息
    16. */
    17. @Override
    18. public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception {
    19. LOGGER.info("收到消息:{}", message.getPayload());
    20. }
    21. /**
    22. * 处理传输过程中发生的错误
    23. */
    24. @Override
    25. public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
    26. LOGGER.info("------WebSocketEndPointHandler:handleTransportError");
    27. }
    28. /**
    29. * 处理连接关闭之后触发操作
    30. * @param closeStatus 关闭的状态信息
    31. */
    32. @Override
    33. public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
    34. LOGGER.info("------WebSocketEndPointHandler:afterConnectionClosed");
    35. List appIds = session.getHandshakeHeaders().get("Sec-WebSocket-Protocol");
    36. webSessionSendService.deleteAppSession(appIds.get(0));
    37. }
    38. @Override
    39. public boolean supportsPartialMessages() {
    40. return false;
    41. }
    42. }

            为了实现从后端将数据发送到前端。所以我们需要记录下每一个前端websocket连接的WebSocketSession。在需要发送数据的时候取出来判断并发送消息。

    1. @Service
    2. public class WebSessionSendService {
    3. private static ConcurrentHashMap sessionPool = new ConcurrentHashMap<>();
    4. public void addAppSession(String appId, WebSocketSession session){
    5. sessionPool.put(appId, session);
    6. }
    7. public void deleteAppSession(String appId){
    8. sessionPool.remove(appId);
    9. }
    10. public void sendMessage(String appId, String message){
    11. WebSocketSession session = sessionPool.get(appId);
    12. if(session == null || !session.isOpen()){
    13. return;
    14. }
    15. TextMessage textMessage = new TextMessage(message);
    16. try {
    17. session.sendMessage(textMessage);
    18. } catch (IOException e) {
    19. throw new RuntimeException(e);
    20. }
    21. }
    22. }

    拦截器 

            其次,实现一个拦截器,用户拦截websocket过来的握手请求,这个可以根据自身需要决定要不要实现。拦截可以实现 HttpSessionHandshakeInterceptorHandshakeInterceptor都可以。

    1. public class MessageHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
    2. private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandshakeInterceptor.class);
    3. @Override
    4. public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception {
    5. LOGGER.info("---------MessageHandshakeInterceptor:beforeHandshake");
    6. super.beforeHandshake(request, response, wsHandler, attributes);
    7. //这里可以拦截请求头, 做用户认证。
    8. List appIds = request.getHeaders().get("Sec-WebSocket-Protocol");
    9. if(CollectionUtils.isEmpty(appIds)){
    10. LOGGER.info("websocket缺少用户认证信息");
    11. return false;
    12. }
    13. response.getHeaders().set("Sec-WebSocket-Protocol", appIds.get(0));
    14. return true;
    15. }
    16. @Override
    17. public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
    18. LOGGER.info("--------MessageHandshakeInterceptor:afterHandshake");
    19. super.afterHandshake(request, response, wsHandler, exception);
    20. }
    21. }

    配置项

    最后是配置项。通过实现WebSocketConfigurer配置相关信息。

    1. @Configuration
    2. @EnableWebSocket
    3. public class PathWebSocketConfig implements WebSocketConfigurer {
    4. @Override
    5. public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    6. //websocket地址以及处理器,可以多个
    7. registry.addHandler(new WebSocketEndPointHandler(), "/websocket/log/getLog")
    8. .setAllowedOrigins("*")
    9. //自定义的 HandshakeHandler实现类会报错,springboot默认需要用 AbstractHandshakeHandler
    10. //.setHandshakeHandler(new WebSocketHandshakeHandler())
    11. .addInterceptors(new MessageHandshakeInterceptor()); //设置拦截器
    12. }
    13. }

    前端测试代码

            我的前端代码是在自己的搭建的一个以vue为基础的项目中测试的。

    1. <template>
    2. <div class="f_c float_l">
    3. <div class="m_10">
    4. <el-input class='input_common' v-model="websocketPath" placeholder="请输入后端websocket地址" >el-input>
    5. <el-button v-if="!connected" @click=" initWebSocket" >连接el-button>
    6. <el-button v-else @click="disConnectWebsocket" >断开el-button>
    7. div>
    8. <div class="f_c" v-if="connected">
    9. <el-button @click="sendMessage">发送el-button>
    10. <el-input class='input_common mt_10' v-model="message" type="textarea" :rows="2" placeholder="请输入需要发送的消息">el-input>
    11. div>
    12. <el-divider />
    13. <div class="m_10 f_c" v-if="connected">
    14. <span class="float_l">收到消息span>
    15. <span style="border: aqua; width: 500px; height: 100px">{{receiveMessage}}span>
    16. div>
    17. div>
    18. template>
    19. <script>
    20. export default {
    21. name: "index",
    22. data(){
    23. return {
    24. websocketPath:'localhost:7000/websocket-demo/websocket/log/getLog',
    25. receiveMessage:'',
    26. message:'',
    27. connected: false,
    28. ws:null,
    29. number:0,
    30. heartbeatIntervalId: null, //客户端心跳定时发送
    31. timeoutId: null //定时检测服务端发送过来的心跳
    32. }
    33. },
    34. methods: {
    35. initWebSocket() {
    36. this.ws = new WebSocket('ws://' + this.websocketPath, ['123456789']);
    37. this.ws.onmessage = this.websocketOnmessage;
    38. this.ws.onopen = this.websocketOpen;
    39. this.ws.onerror = this.websocketError;
    40. this.ws.onclose = this.websocketOnclose;
    41. },
    42. websocketOpen() {
    43. console.log('websocket onopen', this.ws.readyState);
    44. this.connected = true;
    45. // 开始心跳检测
    46. this.startHeartbeat();
    47. },
    48. websocketError(event) {
    49. console.log('websocket onerror', this.ws.readyState);
    50. this.connected =false
    51. clearInterval(this.heartbeatIntervalId);
    52. clearTimeout(this.timeoutId);
    53. },
    54. websocketOnclose(event){
    55. // WebSocket关闭时的处理
    56. console.log('WebSocket disconnected');
    57. // 清除心跳检测定时器
    58. clearInterval(this.heartbeatIntervalId);
    59. clearTimeout(this.timeoutId);
    60. this.connected = false;
    61. },
    62. websocketOnmessage(event) {
    63. console.log(`收到消息:${event.data}`);
    64. let nowMessage = '第' + this.number + '次收到订阅的消息:' + event.data + '\r\n';
    65. this.receiveMessage += nowMessage;
    66. this.number++
    67. this.resetHeartbeat();
    68. },
    69. sendMessage() {
    70. if (this.ws.readyState === WebSocket.OPEN) {
    71. let param = {
    72. message: this.message
    73. }
    74. this.ws.send(JSON.stringify(param));
    75. }
    76. },
    77. startHeartbeat() {
    78. let _that = this
    79. //开始心跳检测
    80. this.heartbeatIntervalId = setInterval( function () {
    81. //根据实际情况发送正确的心跳消息
    82. _that.ws.send('HEARTBEAT');
    83. }, 8 * 1000)
    84. this.timeoutId = setTimeout(() => {
    85. console.error('Heartbeat timeout, reconnecting...');
    86. console.log(this.ws)
    87. this.ws.close();
    88. }, 15 * 1000)
    89. },
    90. resetHeartbeat() {
    91. clearTimeout(this.timeoutId);
    92. let _that = this
    93. this.timeoutId = window.setTimeout(function() {
    94. console.error('Heartbeat timeout, reconnecting...');
    95. _that.ws.close();
    96. }, 15 * 1000);
    97. },
    98. disConnectWebsocket(){
    99. this.ws.close();
    100. clearInterval(this.heartbeatIntervalId);
    101. clearTimeout(this.timeoutId);
    102. }
    103. }
    104. }
    105. script>
    106. <style scoped>
    107. style>

    启动之后就可以填写我们的websocket地址。就可以连接了。

  • 相关阅读:
    【剑指Offer】45. 把数组排成最小的数
    freeswitch之媒体协商模式
    LeetCode题目笔记——6225. 差值数组不同的字符串,Python 32ms
    有哪些值得推荐的优秀 HTML&CSS 网站前端设计的网络资源(博客、论坛)?
    聊一聊redis奇葩数据类型与集群知识
    Spring事务管理 | 数据库连接池流程原理分析
    Mybatis-Plus多种批量插入方案对比
    为什么通过一致性正则化方法就可以避免将所有未标记数据集分配给同一类?
    图像文件格式与数据存储/传输格式详解
    【浏览器缓存机制】
  • 原文地址:https://blog.csdn.net/qq_34484062/article/details/139065993