• netty websockt之断连重试


    断连重试有以下两点考虑:

    1、连接异常,比如网络抖动导致连接失败;

    2、连接过程中断开连接重试;

    主要用到两个工具类:

    ChannelFutureListener监听ChannelFuture..isSuccess();

    ChannelInboundHandlerAdapter重写channelInactive,当连接变为不活跃,则回调该方法。

    完整代码如下:

    1. @Component
    2. public class WebSocketClient {
    3. private Channel channel;
    4. private Bootstrap bootstrap;
    5. private URI uri;
    6. private MessageHandler messageHandler;
    7. private WebSocketClientHandler handler;
    8. private volatile AtomicInteger atomicCount = new AtomicInteger(0);
    9. public WebSocketClient initClient(String host, MessageHandler messageHandler) throws Exception {
    10. this.messageHandler = messageHandler;
    11. if (StringUtils.isEmpty(host)) {
    12. throw new RuntimeException("未配置host.");
    13. }
    14. uri = new URI(host);
    15. String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();
    16. //判断是否ssl连接,如果是则设置为可信
    17. final boolean ssl = WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme);
    18. final SslContext sslCtx;
    19. if (ssl) {
    20. sslCtx = SslContextBuilder.forClient()
    21. .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    22. } else {
    23. sslCtx = null;
    24. }
    25. EventLoopGroup group = new NioEventLoopGroup();
    26. try {
    27. bootstrap = new Bootstrap();
    28. bootstrap.group(group)
    29. .channel(NioSocketChannel.class)
    30. .handler(new ChannelInitializer() {
    31. @Override
    32. protected void initChannel(SocketChannel ch) {
    33. ChannelPipeline p = ch.pipeline();
    34. p.addFirst(new ChannelInboundHandlerAdapter() {
    35. @Override
    36. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    37. log.error("【{}】检测到wss断连, 第 {} 次发起重连.", exchange, atomicCount.incrementAndGet());
    38. super.channelInactive(ctx);
    39. ctx.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
    40. }
    41. });
    42. if (sslCtx != null) {
    43. p.addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), getUriPort(uri)));
    44. }
    45. p.addLast(new HttpClientCodec());
    46. p.addLast(new HttpObjectAggregator(8192));
    47. p.addLast(WebSocketClientCompressionHandler.INSTANCE);
    48. handler = new WebSocketClientHandler(
    49. WebSocketClientHandshakerFactory.newHandshaker(
    50. uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()), exchange, messageHandler);
    51. p.addLast(handler);
    52. }
    53. });
    54. } catch (Exception e) {
    55. log.error("wss创建client异常. e:", e);
    56. if (bootstrap != null) {
    57. bootstrap.config().group().shutdownGracefully();
    58. }
    59. throw new RuntimeException("初始化wss连接异常. e: " + e);
    60. }
    61. doConnect();
    62. return this;
    63. }
    64. public void doConnect() {
    65. try {
    66. ChannelFuture future = bootstrap.connect(uri.getHost(), getUriPort(uri)).sync();
    67. handler.handshakeFuture().sync();
    68. future.addListener((ChannelFutureListener) cf -> {
    69. if (future.isSuccess()) {
    70. channel = future.channel();
    71. WssManger.addChannel(exchange, channel);
    72. log.info("连接成功.");
    73. messageHandler.connectSuccessAction(future.channel());
    74. atomicCount.set(0);
    75. } else {
    76. log.error("监听断连, wss第 {} 次发起重连. ", atomicCount.incrementAndGet());
    77. future.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
    78. }
    79. });
    80. }catch (Exception e) {
    81. log.error("连接异常. e:" + e);
    82. if (bootstrap != null) {
    83. log.info("wss连接异常,第 {} 次发起重连.", atomicCount.incrementAndGet());
    84. bootstrap.config().group().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
    85. }
    86. }
    87. }
    88. /**
    89. * 根据URI获取对应的port
    90. *
    91. * @param uri uri
    92. * @return port
    93. */
    94. private int getUriPort(URI uri) {
    95. String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();
    96. if (!WssSchemeEnum.allScheme().contains(scheme)) {
    97. throw new RuntimeException("Only WS(S) is supported.");
    98. }
    99. if (uri.getPort() == -1) {
    100. if (WssSchemeEnum.WS.getValue().equalsIgnoreCase(scheme)) {
    101. return WssSchemeEnum.WS.getPort();
    102. } else if (WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme)) {
    103. return WssSchemeEnum.WSS.getPort();
    104. } else {
    105. return -1;
    106. }
    107. } else {
    108. return uri.getPort();
    109. }
    110. }
    111. }

  • 相关阅读:
    通信协议综述
    经典算法题12-贪心算法
    PWN入门(3)覆盖堆栈上的变量
    【LeetCode刷题-滑动窗口】--487.最大连续1的个数II
    DW网页作业成品下载后导入Dreamweaver如何新建站点及导出站点
    Linux——(第十章)进程管理
    qt 移植到vs后,常见问题汇总????
    Idea下面git的使用:变基、合并、优选、还原提交、重置、回滚、补丁
    金仓数据库KingbaseES数据库参考手册(服务器配置参数14. 版本和平台兼容性)
    QML控件类型:TabBar
  • 原文地址:https://blog.csdn.net/xiazaizhuanyong1231/article/details/134461371