• 解决websocket不定时出现1005错误


    后台抛出异常如下:

    1. Operator called default onErrorDropped
    2. reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005
    3. Caused by: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005

    分析原因是:

    spring cloud gateway 转发websocket请求无法监听到 close 事件 没有收到预期的状态码

    解决方案:

    在gateway进行请求拦截

    代码如下:

    1. @Slf4j
    2. @Component
    3. public class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered {
    4. //Sec-Websocket protocol
    5. public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
    6. //Sec-Websocket header
    7. public static final String SEC_WEBSOCKET_HEADER = "sec-websocket";
    8. //http header schema
    9. public static final String HEADER_UPGRADE_WebSocket = "websocket";
    10. public static final String HEADER_UPGRADE_HTTP = "http";
    11. public static final String HEADER_UPGRADE_HTTPS = "https";
    12. private final WebSocketClient webSocketClient;
    13. private final WebSocketService webSocketService;
    14. private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
    15. // 不直接使用 headersFilters 用该变量代替
    16. private volatile List<HttpHeadersFilter> headersFilters;
    17. public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {
    18. this.webSocketClient = webSocketClient;
    19. this.webSocketService = webSocketService;
    20. this.headersFiltersProvider = headersFiltersProvider;
    21. }
    22. /* for testing */
    23. //http请求转为ws请求
    24. static String convertHttpToWs(String scheme) {
    25. scheme = scheme.toLowerCase();
    26. return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;
    27. }
    28. @Override
    29. public int getOrder() {
    30. // Before NettyRoutingFilter since this routes certain http requests
    31. //修改了这里 之前是-1 降低优先级
    32. return Ordered.LOWEST_PRECEDENCE - 2;
    33. }
    34. @Override
    35. public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    36. changeSchemeIfIsWebSocketUpgrade(exchange);
    37. URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
    38. String scheme = requestUrl.getScheme();
    39. if (ServerWebExchangeUtils.isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {
    40. return chain.filter(exchange);
    41. }
    42. ServerWebExchangeUtils.setAlreadyRouted(exchange);
    43. HttpHeaders headers = exchange.getRequest().getHeaders();
    44. HttpHeaders filtered = HttpHeadersFilter.filterRequest(getHeadersFilters(), exchange);
    45. List<String> protocols = getProtocols(headers);
    46. return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));
    47. }
    48. /* for testing */
    49. //获取请求头里的协议信息
    50. List<String> getProtocols(HttpHeaders headers) {
    51. List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
    52. if (protocols != null) {
    53. ArrayList<String> updatedProtocols = new ArrayList<>();
    54. for (int i = 0; i < protocols.size(); i++) {
    55. String protocol = protocols.get(i);
    56. updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ",")));
    57. }
    58. protocols = updatedProtocols;
    59. }
    60. return protocols;
    61. }
    62. /* for testing */
    63. List<HttpHeadersFilter> getHeadersFilters() {
    64. if (this.headersFilters == null) {
    65. this.headersFilters = this.headersFiltersProvider.getIfAvailable(ArrayList::new);
    66. // remove host header unless specifically asked not to
    67. this.headersFilters.add((headers, exchange) -> {
    68. HttpHeaders filtered = new HttpHeaders();
    69. filtered.addAll(headers);
    70. filtered.remove(HttpHeaders.HOST);
    71. boolean preserveHost = exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);
    72. if (preserveHost) {
    73. String host = exchange.getRequest().getHeaders().getFirst(HttpHeaders.HOST);
    74. filtered.add(HttpHeaders.HOST, host);
    75. }
    76. return filtered;
    77. });
    78. this.headersFilters.add((headers, exchange) -> {
    79. HttpHeaders filtered = new HttpHeaders();
    80. for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
    81. if (!entry.getKey().toLowerCase().startsWith(SEC_WEBSOCKET_HEADER)) {
    82. filtered.addAll(entry.getKey(), entry.getValue());
    83. }
    84. }
    85. return filtered;
    86. });
    87. }
    88. return this.headersFilters;
    89. }
    90. static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {
    91. // 检查版本是否适合
    92. URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
    93. String scheme = requestUrl.getScheme().toLowerCase();
    94. String upgrade = exchange.getRequest().getHeaders().getUpgrade();
    95. // change the scheme if the socket client send a "http" or "https"
    96. if (HEADER_UPGRADE_WebSocket.equalsIgnoreCase(upgrade) && (HEADER_UPGRADE_HTTP.equals(scheme) || HEADER_UPGRADE_HTTPS.equals(scheme))) {
    97. String wsScheme = convertHttpToWs(scheme);
    98. boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl);
    99. URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri();
    100. exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
    101. if (log.isTraceEnabled()) {
    102. log.trace("changeSchemeTo:[" + wsRequestUrl + "]");
    103. }
    104. }
    105. }
    106. //自定义websocket处理方式
    107. private static class ProxyWebSocketHandler implements WebSocketHandler {
    108. private final WebSocketClient client;
    109. private final URI url;
    110. private final HttpHeaders headers;
    111. private final List<String> subProtocols;
    112. ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {
    113. this.client = client;
    114. this.url = url;
    115. this.headers = headers;
    116. if (protocols != null) {
    117. this.subProtocols = protocols;
    118. } else {
    119. this.subProtocols = Collections.emptyList();
    120. }
    121. }
    122. @Override
    123. public List<String> getSubProtocols() {
    124. return this.subProtocols;
    125. }
    126. @Override
    127. public Mono<Void> handle(WebSocketSession session) {
    128. return this.client.execute(this.url, this.headers, new WebSocketHandler() {
    129. private CloseStatus adaptCloseStatus(CloseStatus closeStatus) {
    130. int code = closeStatus.getCode();
    131. if (code > 2999 && code < 5000) {
    132. return closeStatus;
    133. }
    134. switch (code) {
    135. case 1000:
    136. //正常关闭
    137. return closeStatus;
    138. case 1001:
    139. //服务器挂了或者页面跳转
    140. return closeStatus;
    141. case 1002:
    142. //协议错误
    143. return closeStatus;
    144. case 1003:
    145. //收到了不能处理的数据类型
    146. return closeStatus;
    147. case 1004:
    148. // 预留关闭状态码
    149. return CloseStatus.PROTOCOL_ERROR;
    150. case 1005:
    151. // 预留关闭状态码 期望收到状态码但是没有收到
    152. return CloseStatus.PROTOCOL_ERROR;
    153. case 1006:
    154. // 预留关闭状态码 连接异常关闭
    155. return CloseStatus.PROTOCOL_ERROR;
    156. case 1007:
    157. //收到的数据与实际的消息类型不匹配
    158. return closeStatus;
    159. case 1008:
    160. //收到不符合规则的消息
    161. return closeStatus;
    162. case 1009:
    163. //收到太大的不能处理的消息
    164. return closeStatus;
    165. case 1010:
    166. //client希望server提供多个扩展,server没有返回相应的扩展信息
    167. return closeStatus;
    168. case 1011:
    169. //server遇到不能完成的请求
    170. return closeStatus;
    171. case 1012:
    172. // Not in RFC6455
    173. // return CloseStatus.SERVICE_RESTARTED;
    174. return CloseStatus.PROTOCOL_ERROR;
    175. case 1013:
    176. // Not in RFC6455
    177. // return CloseStatus.SERVICE_OVERLOAD;
    178. return CloseStatus.PROTOCOL_ERROR;
    179. case 1015:
    180. // 不能进行TLS握手 如:server证书不能验证
    181. return CloseStatus.PROTOCOL_ERROR;
    182. default:
    183. return CloseStatus.PROTOCOL_ERROR;
    184. }
    185. }
    186. /**
    187. * send 发送传出消息
    188. * receive 处理入站消息流
    189. * doOnNext 对每条消息做什么
    190. * zip 加入流
    191. * then 返回接收完成时完成的Mono<Void>
    192. */
    193. @Override
    194. public Mono<Void> handle(WebSocketSession proxySession) {
    195. Mono<Void> serverClose = proxySession.closeStatus().filter(__ -> session.isOpen())
    196. .map(this::adaptCloseStatus)
    197. .flatMap(session::close);
    198. Mono<Void> proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen())
    199. .map(this::adaptCloseStatus)
    200. .flatMap(proxySession::close);
    201. // Use retain() for Reactor Netty
    202. Mono<Void> proxySessionSend = proxySession
    203. .send(session.receive().doOnNext(WebSocketMessage::retain));
    204. Mono<Void> serverSessionSend = session
    205. .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
    206. // Ensure closeStatus from one propagates to the other
    207. Mono.when(serverClose, proxyClose).subscribe();
    208. // Complete when both sessions are done
    209. return Mono.zip(proxySessionSend, serverSessionSend).then();
    210. }
    211. @Override
    212. public List<String> getSubProtocols() {
    213. return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols;
    214. }
    215. });
    216. }
    217. }
    218. }

  • 相关阅读:
    Java中的dozer对象转换
    JDBC笔记
    Altassian | Bitbucket Python API使用及相关任务自动化的实现
    【C++】class的设计与使用(十)重载iostream运算符
    sql根据出生日期计算当前年龄 函数TIMESTAMPDIFF()
    广度优先搜索(Breadth First Search, BFS)算法
    编译nw-node版本的插件
    故障分析 | OceanBase Proxy 无法连接 OBserver 集群
    需求分析岗的一般工作流程
    sfm算法之三角化(三角测量)
  • 原文地址:https://blog.csdn.net/vigor512/article/details/132620369