• netty websockt之断连重试


    断连重试有以下两点考虑:

    1、连接异常,比如网络抖动导致连接失败;

    2、连接过程中断开连接重试;

    主要用到两个工具类:

    ChannelFutureListener监听ChannelFuture..isSuccess();

    ChannelInboundHandlerAdapter重写channelInactive,当连接变为不活跃,则回调该方法。

    完整代码如下:

    1. @Component
    2. public class WebSocketClient {
    3. private Channel channel;
    4. private Bootstrap bootstrap;
    5. private URI uri;
    6. private MessageHandler messageHandler;
    7. private WebSocketClientHandler handler;
    8. private volatile AtomicInteger atomicCount = new AtomicInteger(0);
    9. public WebSocketClient initClient(String host, MessageHandler messageHandler) throws Exception {
    10. this.messageHandler = messageHandler;
    11. if (StringUtils.isEmpty(host)) {
    12. throw new RuntimeException("未配置host.");
    13. }
    14. uri = new URI(host);
    15. String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();
    16. //判断是否ssl连接,如果是则设置为可信
    17. final boolean ssl = WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme);
    18. final SslContext sslCtx;
    19. if (ssl) {
    20. sslCtx = SslContextBuilder.forClient()
    21. .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    22. } else {
    23. sslCtx = null;
    24. }
    25. EventLoopGroup group = new NioEventLoopGroup();
    26. try {
    27. bootstrap = new Bootstrap();
    28. bootstrap.group(group)
    29. .channel(NioSocketChannel.class)
    30. .handler(new ChannelInitializer() {
    31. @Override
    32. protected void initChannel(SocketChannel ch) {
    33. ChannelPipeline p = ch.pipeline();
    34. p.addFirst(new ChannelInboundHandlerAdapter() {
    35. @Override
    36. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    37. log.error("【{}】检测到wss断连, 第 {} 次发起重连.", exchange, atomicCount.incrementAndGet());
    38. super.channelInactive(ctx);
    39. ctx.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
    40. }
    41. });
    42. if (sslCtx != null) {
    43. p.addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), getUriPort(uri)));
    44. }
    45. p.addLast(new HttpClientCodec());
    46. p.addLast(new HttpObjectAggregator(8192));
    47. p.addLast(WebSocketClientCompressionHandler.INSTANCE);
    48. handler = new WebSocketClientHandler(
    49. WebSocketClientHandshakerFactory.newHandshaker(
    50. uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()), exchange, messageHandler);
    51. p.addLast(handler);
    52. }
    53. });
    54. } catch (Exception e) {
    55. log.error("wss创建client异常. e:", e);
    56. if (bootstrap != null) {
    57. bootstrap.config().group().shutdownGracefully();
    58. }
    59. throw new RuntimeException("初始化wss连接异常. e: " + e);
    60. }
    61. doConnect();
    62. return this;
    63. }
    64. public void doConnect() {
    65. try {
    66. ChannelFuture future = bootstrap.connect(uri.getHost(), getUriPort(uri)).sync();
    67. handler.handshakeFuture().sync();
    68. future.addListener((ChannelFutureListener) cf -> {
    69. if (future.isSuccess()) {
    70. channel = future.channel();
    71. WssManger.addChannel(exchange, channel);
    72. log.info("连接成功.");
    73. messageHandler.connectSuccessAction(future.channel());
    74. atomicCount.set(0);
    75. } else {
    76. log.error("监听断连, wss第 {} 次发起重连. ", atomicCount.incrementAndGet());
    77. future.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
    78. }
    79. });
    80. }catch (Exception e) {
    81. log.error("连接异常. e:" + e);
    82. if (bootstrap != null) {
    83. log.info("wss连接异常,第 {} 次发起重连.", atomicCount.incrementAndGet());
    84. bootstrap.config().group().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
    85. }
    86. }
    87. }
    88. /**
    89. * 根据URI获取对应的port
    90. *
    91. * @param uri uri
    92. * @return port
    93. */
    94. private int getUriPort(URI uri) {
    95. String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();
    96. if (!WssSchemeEnum.allScheme().contains(scheme)) {
    97. throw new RuntimeException("Only WS(S) is supported.");
    98. }
    99. if (uri.getPort() == -1) {
    100. if (WssSchemeEnum.WS.getValue().equalsIgnoreCase(scheme)) {
    101. return WssSchemeEnum.WS.getPort();
    102. } else if (WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme)) {
    103. return WssSchemeEnum.WSS.getPort();
    104. } else {
    105. return -1;
    106. }
    107. } else {
    108. return uri.getPort();
    109. }
    110. }
    111. }

  • 相关阅读:
    prometheus helm install 如何配置告警模版
    【unity小技巧】Unity 存储存档保存——PlayerPrefs、JsonUtility和MySQL数据库的使用
    《德米安》从那以后伤口很痛,但偶尔我会找到钥匙,沉入心底
    【LVS】nat模式+dr模式+防火墙标签解决轮询错误
    Kibana生产上的常用功能总结
    Spring Cloud Alibaba 使用Gateway作为服务网关
    【mindspore连接atlas 200dk】run后显示no matching deployment mapping
    批处理的应用和源码分析
    【EF Core】实体的主、从关系
    【数据聚类】第六章第二节:层次聚类算法之BIRCH算法
  • 原文地址:https://blog.csdn.net/xiazaizhuanyong1231/article/details/134461371