断连重试有以下两点考虑:
1、连接异常,比如网络抖动导致连接失败;
2、连接过程中断开连接重试;
主要用到两个工具类:
ChannelFutureListener监听ChannelFuture..isSuccess();
ChannelInboundHandlerAdapter重写channelInactive,当连接变为不活跃,则回调该方法。
完整代码如下:
- @Component
- public class WebSocketClient {
-
- private Channel channel;
-
- private Bootstrap bootstrap;
-
- private URI uri;
-
- private MessageHandler messageHandler;
-
- private WebSocketClientHandler handler;
-
- private volatile AtomicInteger atomicCount = new AtomicInteger(0);
-
- public WebSocketClient initClient(String host, MessageHandler messageHandler) throws Exception {
- this.messageHandler = messageHandler;
- if (StringUtils.isEmpty(host)) {
- throw new RuntimeException("未配置host.");
- }
- uri = new URI(host);
- String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();
-
- //判断是否ssl连接,如果是则设置为可信
- final boolean ssl = WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme);
- final SslContext sslCtx;
- if (ssl) {
- sslCtx = SslContextBuilder.forClient()
- .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
- } else {
- sslCtx = null;
- }
-
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline p = ch.pipeline();
- p.addFirst(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- log.error("【{}】检测到wss断连, 第 {} 次发起重连.", exchange, atomicCount.incrementAndGet());
- super.channelInactive(ctx);
- ctx.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
- }
- });
- if (sslCtx != null) {
- p.addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), getUriPort(uri)));
- }
- p.addLast(new HttpClientCodec());
- p.addLast(new HttpObjectAggregator(8192));
- p.addLast(WebSocketClientCompressionHandler.INSTANCE);
- handler = new WebSocketClientHandler(
- WebSocketClientHandshakerFactory.newHandshaker(
- uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()), exchange, messageHandler);
- p.addLast(handler);
- }
- });
- } catch (Exception e) {
- log.error("wss创建client异常. e:", e);
- if (bootstrap != null) {
- bootstrap.config().group().shutdownGracefully();
- }
- throw new RuntimeException("初始化wss连接异常. e: " + e);
- }
- doConnect();
- return this;
- }
-
- public void doConnect() {
- try {
- ChannelFuture future = bootstrap.connect(uri.getHost(), getUriPort(uri)).sync();
- handler.handshakeFuture().sync();
- future.addListener((ChannelFutureListener) cf -> {
- if (future.isSuccess()) {
- channel = future.channel();
- WssManger.addChannel(exchange, channel);
- log.info("连接成功.");
- messageHandler.connectSuccessAction(future.channel());
- atomicCount.set(0);
- } else {
- log.error("监听断连, wss第 {} 次发起重连. ", atomicCount.incrementAndGet());
- future.channel().eventLoop().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
- }
- });
- }catch (Exception e) {
- log.error("连接异常. e:" + e);
- if (bootstrap != null) {
- log.info("wss连接异常,第 {} 次发起重连.", atomicCount.incrementAndGet());
- bootstrap.config().group().schedule(WebSocketClient.this::doConnect, 3000, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- /**
- * 根据URI获取对应的port
- *
- * @param uri uri
- * @return port
- */
- private int getUriPort(URI uri) {
- String scheme = uri.getScheme() == null? WssSchemeEnum.WS.getValue() : uri.getScheme();
- if (!WssSchemeEnum.allScheme().contains(scheme)) {
- throw new RuntimeException("Only WS(S) is supported.");
- }
- if (uri.getPort() == -1) {
- if (WssSchemeEnum.WS.getValue().equalsIgnoreCase(scheme)) {
- return WssSchemeEnum.WS.getPort();
- } else if (WssSchemeEnum.WSS.getValue().equalsIgnoreCase(scheme)) {
- return WssSchemeEnum.WSS.getPort();
- } else {
- return -1;
- }
- } else {
- return uri.getPort();
- }
- }
- }