• Netty-实验


    Netty应用实例-群聊系统

    实例要求:

    (1)编写一个Netty群聊系统,实现服务端和客户端之间的数据简单通讯(非阻塞)
    (2)实现多人群聊
    (3)服务器端:可以监视用户上线,离线,并实现消息转发功能
    (4)客户端:通过channel可以物阻塞发送消息给其他用户,同时可以接受其他用户发送的消息(由服务器转发的得到)
    (5)目的:进一步理解Netty非阻塞网络编程机制

    拆解过程:

    首先我们建立GroupChatServer端,然后重写对应的具体业务处理的handler,在handler中具体需要捕捉的是客户端channel的上线、下线、具体对应的就是handlerAdded、handlerRemoved、channelActive、channelInactive,分别是处理发送给其他客户端的消息和服务器捕捉发送消息,最重要的就是读消息并转发,对于发送消息的channel的处理和其他客户端不一样。

    然后我们建立GroupChatClient端,不同的就是我们的消息需要响应输入,然后发送,对于handler只需要读取消息打印即可。

    GroupchatServer实现:
    package com.sgg.Netty.GroupChat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class GroupchatServer {
        private int port;
    
        public GroupchatServer(int port){
            this.port = port;
        }
    
        public void run() throws InterruptedException {
            //创建线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            try{
                b.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true)    //设置保持活动连接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder",new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new GroupchatServerhandler());
                            }
                        });
                System.out.println("服务器启动");
                ChannelFuture cf = b.bind(port).sync();
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if(cf.isSuccess()){
                            System.out.println("监听端口成功");
                        }else{
                            System.out.println("监听端口失败");
                        }
                    }
                });
    
                cf.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
    
        }
        public static void main(String[] args) throws InterruptedException {
            new GroupchatServer(8847).run();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    GroupchatServerhandler实现:
    package com.sgg.Netty.GroupChat;
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    
    public class GroupchatServerhandler extends SimpleChannelInboundHandler<String> {
    
    
        //定义一个channel组,管理所有的channel
        private static ChannelGroup channelGroup= new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy--mm-dd HH:mm:ss");
    
        /**
         *处理所有在线客户端某新客户加入消息
         */
        //handlerAdded 表示连接建立,一旦连接,第一个被执行
        //将当前channelchannelGroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //将该客户端加入聊天的消息推送给其他在线的客户端
            //该方法会将channelGroup中所有的channel遍历,并发送消息
            channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天\n");
            channelGroup.add(channel);
        }
    
        /**
         *处理所有在线客户端某新客户离开的消息
         */
        //断开连接,将XX酷互动离开的信息推送给当前所有在线的客户
        //出发该方法时ChannelGroup中会自动删除该channenl,不需手动删除
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //将该客户端离开聊天的消息推送给其他在线的客户端
            //该方法会将channelGroup中所有的channel遍历,并发送消息
            channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开聊天\n");
        }
    
        /**
         *处理服务端显示新客户上线消息
         */
        //表示channel处于活动状态,提示XX上线
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress()+"上线了");
        }
    
        /**
         *处理服务端显示新客户上下线消息
         */
        //表示channel处于不活动状态,提示XX下线
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress()+"下线了~");
        }
    
        //读取数据并转发给当前在线的所有人
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
            Channel channel = ctx.channel();
    
            channelGroup.forEach(ch->{
                if(channel!=ch){
                    ch.writeAndFlush("[客户】"+channel.remoteAddress()+"发送了消息"+s+"\n");
                }else{
                    ch.writeAndFlush("[自己]发送了消息"+s+"\n");
                }
            });
        }
    
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    GroupchatClient实现:
    package com.sgg.Netty.GroupChat;
    
    import com.sgg.Netty.simple.NettyClienthandler;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    public class GroupchatClient {
        private final String host;
        private final int port;
        public GroupchatClient(String host, int port){
            this.host = host;
            this.port = port;
        }
        public void run() throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
    
            Bootstrap bootstrap = new Bootstrap();
    
            try {
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder",new StringDecoder());
                                pipeline.addLast("encoder",new StringEncoder());
                                pipeline.addLast(new GroupchatCinenthandler());//加入自己的handler
                            }
                        });
    //            System.out.println("客户端OK");
    
                ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
                Channel channel = channelFuture.channel();
                System.out.println("---------"+channel.localAddress()+"--------");
                //客户端需要输入信息
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()){
                    String msg = scanner.nextLine();
                    //通过channel发送出去
                    channel.writeAndFlush(msg+"\r\n");
                }
    
            }finally {
                //关闭
                group.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            new GroupchatClient("127.0.0.1",8847).run();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    GroupchatClienthandler实现:
    package com.sgg.Netty.GroupChat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class GroupchatCinenthandler extends SimpleChannelInboundHandler<String> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println(s.trim());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    到此,我们实现了群聊系统如下:
    GroupchatServer
    在这里插入图片描述
    GrouchatClient1
    在这里插入图片描述
    GrouchatClient2
    在这里插入图片描述
    GrouchatClient3
    在这里插入图片描述

    Netty心跳机制实验

    我们先来看看心跳机制的产生
    我们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。

    那么心跳机制可以用来做什么呢?

    我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。

    如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据(可能是空包,也可能是特殊数据),对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。

    1. 如何实现心跳机制

    一般实现心跳机制由两种方式:

    TCP协议自带的心跳机制来实现;
    在应用层来实现。
    但是TCP协议自带的心跳机制系统默认是设置的是2小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与TCP协议绑定的,那如果我们要是使用UDP协议岂不是用不了?所以一般我们都不用。

    而一般我们自己实现呢大致的策略是这样的:

    Client启动一个定时器,不断发送心跳;
    Server收到心跳后,做出回应;
    Server启动一个定时器,判断Client是否存在,这里做判断有两种方法:时间差和简单标识。
    时间差:

    收到一个心跳包之后记录当前时间;
    判断定时器到达时间,计算多久没收到心跳时间=当前时间-上次收到心跳时间。如果改时间大于设定值则认为超时。
    简单标识:

    收到心跳后设置连接标识为true;
    判断定时器到达时间,如果未收到心跳则设置连接标识为false;

    Netty中的心跳机制的实现:

    我们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的处理,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。
    该类可以对三种类型的超时做心跳机制检测:

    • readerIdleTimeSeconds:设置读超时时间;
    • writerIdleTimeSeconds:设置写超时时间;
    • allIdleTimeSeconds:同时为读或写设置超时时间;

    下面我们进行心跳机制的实验,我们只需要写服务端即可,客户端我们可以用上面的群聊实验中的客户端即可。
    MyServer:

    package com.sgg.Netty.HeatBeat;
    
    import com.sgg.Netty.simple.NettyServerhandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    
    import java.util.concurrent.TimeUnit;
    
    public class MyServer {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            ServerBootstrap bootstrap = new ServerBootstrap();
    
            try{
                //使用链式变成来进行设置
                bootstrap.group(bossGroup,workerGroup)       //设置两个线程组
                        .channel(NioServerSocketChannel.class)     //使用NioServerSocketChannel作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true)       //设置保持活动连接状态
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {         //创建一个通道测试对象
                            //给pipeline设置处理器
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                //加入一个Netty提供的IdleStateHandler
                                /**
                                 * 1、IdleStateHandler是netty提供的处理空闲状态的处理器
                                 * 2、long readerIdleTime: 表示多长时间没有读,就会发送一个心跳监测包检测是否连接
                                 * 3、long writerIdleTime: 表示多长时间没有写,就会发送一个心跳监测包检测是否连接
                                 * 4、long allIdleTime: 表示多长时间没有读和写,就会发送一个心跳监测包检测是否连接
                                 * 5、当IdleStateHandler触发后,就会传递给管道的下一个handler去处理,
                                 * 通过调用(触发)下一个handler的userEventTiggered,在该方法中去处理
                                 */
    
                                pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                                pipeline.addLast(new MyServerhandler());
                            }
                        });
    
                ChannelFuture cf = bootstrap.bind(8847).sync();
                //给cf注册监听器,监控我们关心的事件
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if(cf.isSuccess()){
                            System.out.println("监听端口成功");
                        }else{
                            System.out.println("监听端口失败");
                        }
                    }
                });
    
                //对关闭通道进行监听
                cf.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    MyServerhandler:

    package com.sgg.Netty.HeatBeat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    
    public class MyServerhandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 心跳机制触发后的处理函数
         * @param ctx 上下文
         * @param evt 心跳机制发生的事件
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                //将evt向下转型 IdleStateEvent
                IdleStateEvent event  = (IdleStateEvent)evt;
                String eventType = null;
                switch (event.state()){
                    case READER_IDLE:
                        eventType = "读空闲";
                        break;
                    case WRITER_IDLE:
                        eventType = "写空闲";
                        break;
                    case ALL_IDLE:
                        eventType = "读写空闲";
                        break;
                }
                System.out.println(ctx.channel().remoteAddress()+"------"+eventType);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    实验结果:
    在这里插入图片描述
    我们在handler中实现的userEventTriggered方法中我们可以捕捉到空闲事件,后续根据需要即可进行响应的处理

  • 相关阅读:
    EvaluLLM: LLM Assisted Evaluation of Generative Outputs论文阅读
    【C++笔记】第三篇 关键字和标识符
    实战教程:如何将自己的Python包发布到PyPI上
    C++笔记-八股
    JS面试题----防抖函数
    多目标优化算法:基于非支配排序的霸王龙优化算法(NSTROA)MATLAB
    thinkphp5 如何模拟在apifox里面 post数据接收
    九、【VUE-CLI】浏览器本地存储(待办事项案例 · 第二版)
    深入篇【C++】总结<lambda表达式>与<包装器和bind>的使用与意义
    复杂的连接如何破坏智能家居体验
  • 原文地址:https://blog.csdn.net/XZB119211/article/details/127871879