• Netty 心跳机制及断线重连


    1、心跳检测

    心跳检测是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制。

    为什么使用心跳检测?

    • 假死:如果底层的TCP连接(socket连接)已经断开,但是服务端并没有正常关闭套接字,服务端认为这条TCP连接仍然是存在的。因为每个连接都会耗费CPU和内存资源,因此大量假死的连接会逐渐耗光服务器的资源,使得服务器越来越慢,IO处理效率越来越低,最终导致服务器崩溃。所以使用心跳检测处理这些假死的客户端

    如何处理假死?

    • 客户端定时进行心跳检测、服务端定时进行空闲检测。

    空闲检测就是每隔一段时间检测子通道是否有数据读写,如果有,则子通道是正常的;如果没有,则子通道被判定为假死,关掉子通道。

    1.1、Netty心跳检测及空闲检测

    IdleStateHandler

    • 添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler实现userEventTriggered()方法作为超时事件的逻辑处理;

    • 如果设定IdleStateHandler心跳检测

      • 服务端:readerIdleTime每五秒进行一次读检测,设定时间内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
      • 客户端:writerIdleTime每五秒进行一次读检测,设定时间内write()方法未被调用则触发一次userEventTrigger()方法

    IdleStateHandler 构造方法参数

    • readerIdleTime:为读超时时间
    • writerIdleTime:为写超时时间
    • allIdleTime:所有类型的超时时间

    在这里插入图片描述

    //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
    //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
    //实现userEventTriggered方法处理对应事件
    pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
    
    • 1
    • 2
    • 3
    • 4

    Handler重写userEventTriggered方法

    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
    
        String eventType = null;
        switch (event.state()) {
            case READER_IDLE:
                eventType = "读空闲";
                readIdleTimes++; // 读空闲的计数加1
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType = "读写空闲";
                // 不处理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
        if (readIdleTimes > 3) {
            System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
            ctx.channel().writeAndFlush("idle close");
            ctx.channel().close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里插入图片描述

    2、断线重连

    netty 的服务端一般情况下不需要断线重连,应为服务端服务宕机就只能重新启动服务;所以今天我们研究的是客户端的断线重连;

    断线重连是指由于发生网络故障而导致服务中断的情况,客户端就需要从重新连接服务端;

    Netty客户端添加监听添加监听后如果连接中断会调用operationComplete方法

    
    import com.example.netty.idle.HeartBeatClient;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.EventLoop;
    import java.util.concurrent.TimeUnit;
    
    public class ConnectionListener  implements ChannelFutureListener {
    
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (!channelFuture.isSuccess()) {
                final EventLoop loop = channelFuture.channel().eventLoop();
                loop.schedule(new Runnable() {
                    @Override
                    public void run() {
                        System.err.println("服务端链接不上,开始重连操作...");
                        HeartBeatClient.Connection.connect();
                    }
                }, 1L, TimeUnit.SECONDS);
            } else {
                System.err.println("服务端链接成功...");
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioChannelOption;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    import java.util.concurrent.TimeUnit;
    
    //服务端代码
    public class HeartBeatServer {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss, worker)
                        .channel(NioServerSocketChannel.class)
                        .childOption(NioChannelOption.SO_KEEPALIVE,true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
                                //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
                                //实现userEventTriggered方法处理对应事件
                                pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                                pipeline.addLast(new HeartBeatServerHandler());
                            }
                        });
                System.out.println("netty server start。。");
                ChannelFuture future = bootstrap.bind(9000).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                worker.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
    
        //服务端处理handler
        public static class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
    
            int readIdleTimes = 0;
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
                System.out.println(" ====== > [server] message received : " + s);
                if ("Heartbeat Packet".equals(s)) {
                    ctx.channel().writeAndFlush("ok");
                } else {
                    System.out.println(" 其他信息处理 ... ");
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                IdleStateEvent event = (IdleStateEvent) evt;
    
                String eventType = null;
                switch (event.state()) {
                    case READER_IDLE:
                        eventType = "读空闲";
                        readIdleTimes++; // 读空闲的计数加1
                        break;
                    case WRITER_IDLE:
                        eventType = "写空闲";
                        // 不处理
                        break;
                    case ALL_IDLE:
                        eventType = "读写空闲";
                        // 不处理
                        break;
                }
                System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
                if (readIdleTimes > 3) {
                    System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
                    ctx.channel().writeAndFlush("idle close");
                    ctx.channel().close();
                }
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    
    import com.example.netty.config.ConnectionListener;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    //客户端代码
    public class HeartBeatClient {
        public static void main(String[] args) throws Exception {
            Connection.connect();
        }
        public static class Connection{
    
            public static void connect(){
                EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
                try {
                    Bootstrap bootstrap = new Bootstrap();
                    bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast("decoder", new StringDecoder());
                                    pipeline.addLast("encoder", new StringEncoder());
                                    pipeline.addLast(new HeartBeatClientHandler());
                                }
                            });
    
                    System.out.println("netty client start。。");
                    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
                    Channel channel = channelFuture.channel();
                    String text = "Heartbeat Packet";
                    Random random = new Random();
                    while (channel.isActive()) {
                        int num = random.nextInt(10);
                        Thread.sleep(num * 1000);
                        channel.writeAndFlush(text);
                    }
                    // 添加监听后 如果连接中断会调用GenericFutureListener中operationComplete方法(子类实现)
                    channelFuture.addListener(new ConnectionListener());
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    eventLoopGroup.shutdownGracefully();
                }
            }
    
        }
    
        static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                System.out.println(" client received :" + msg);
                if (msg != null && msg.equals("idle close")) {
                    System.out.println(" 服务端关闭连接,客户端也关闭");
                    ctx.channel().closeFuture();
                }
            }
    
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                System.err.println("channelInactive 掉线了...");
                //使用过程中断线重连
                final EventLoop eventLoop = ctx.channel().eventLoop();
                eventLoop.schedule(new Runnable() {
                    @Override
                    public void run() {
                        HeartBeatClient.Connection.connect();
                    }
                }, 1L, TimeUnit.SECONDS);
                super.channelInactive(ctx);
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    在这里插入图片描述

  • 相关阅读:
    Java手写归并排序和案例拓展
    Kafka3.x核心知识速查手册-一、快速上手篇
    VMware 安装Ubuntu22.04
    Linux进阶-ipc共享内存
    c++小游戏(更新中)
    记一次springboot的@RequestBody json值注入失败的问题(字段大小写的问题)
    CoinGecko 播客:与 Cartesi 联合创始人 Erick 一起构建 Layer-2
    【Spring MVC研究】DispatcherServlet如何处理请求(doDispatcher方法)
    设计模式 - 单例模式理解及相关问题解决方法
    【C++】函数重载 ① ( 函数重载概念 | 函数重载判断标准 - 参数个数 / 类型 / 顺序 | 返回值不是函数重载判定标准 )
  • 原文地址:https://blog.csdn.net/Extraordinarylife/article/details/126131244