• 网络编程-NIO案例 与 AIO 案例


    案例说明:一个简单的群聊实现,支持重复上下线。

    NIO

    服务端

    public class NIOServer {
        public static void main(String[] args) throws IOException {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 初始化服务器
            serverChannel.bind(new InetSocketAddress(9999));
            Selector selector = Selector.open();
            serverChannel.configureBlocking(false);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // 每过两秒中来看是否有请求过来
                if (selector.select(2000) != 0) {
                    System.out.println("===================");
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    try {
                        String ipStr = "";
                        while (it.hasNext()) {
                            SelectionKey next = it.next();
                            // 建立连接
                            if (next.isAcceptable()) {
                                ByteBuffer bu = ByteBuffer.allocate(1024);
                                SocketChannel channel = serverChannel.accept();
                                channel.configureBlocking(false);
                                channel.register(selector, SelectionKey.OP_READ, bu);
                                ipStr = channel.getRemoteAddress().toString().substring(1);
                                System.out.println(ipStr + "上线 ...");
                            }
                            // 读取数据
                            if (next.isReadable()) {
                                SocketChannel channel = (SocketChannel) next.channel();
                                // 如果这个时候通道已经关闭了
                                if (!channel.isOpen()) {
                                    next.cancel();
                                    return;
                                }
                                try {
                                    channel.configureBlocking(false);
                                    ByteBuffer buffer = (ByteBuffer) next.attachment();
                                    channel.read(buffer);
                                    String msg = new String(buffer.array(), 0, buffer.position());
                                    System.out.println("receive : " + msg);
                                    // 广播消息
                                    broadCast(selector, channel, msg);
                                    buffer.clear();
                                } catch (Exception e) {
                                    System.out.println("======================发生异常进行下线操作=========");
                                    next.cancel();
                                    it.remove();
                                   
                                    continue;
                                }
                            }
                            it.remove();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
            }
        }
    
        public static void broadCast(Selector selector, SocketChannel channel, String msg) throws IOException {
            Set<SelectionKey> keys = selector.keys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                SelectableChannel targetChannel = next.channel();
                // 如果被广播的对象连接还在
                if (targetChannel.isOpen()) {
                    if (targetChannel instanceof SocketChannel && channel != targetChannel) {
                        ((SocketChannel) targetChannel).write(ByteBuffer.wrap(msg.getBytes()));
                    }
                } else {
                    // 表示通道不存在了 进行下线操作
                    next.cancel();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    客户端

    public class NIOClient {
        private SocketChannel channel;
        private String userName;
    
        private String bindIP;
        private int bindPort;
        public NIOClient(String userName, String bindIP, int bindPort) throws IOException {
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            this.bindIP = bindIP;
            this.bindPort = bindPort;
            channel.connect(new InetSocketAddress(bindIP, bindPort));
            this.userName = userName;
            while (!channel.finishConnect()) {
                // 等待连接成功
            }
        }
        public void sendMsg(String msg) throws IOException {
            if (msg == "end") {
                channel.close();
                return;
            }
            msg = "from " + this.userName + " : " + msg;
            channel.write(ByteBuffer.wrap(msg.getBytes()));
        }
        public void receive() throws IOException {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int size = channel.read(buffer);
            if(size>0){
                String msg=new String(buffer.array());
                System.out.println(msg.trim());
            }
        }
    }
    // Main 函数
    public static void main(String[] args) throws IOException {
        new Thread(() -> {
            final NIOClient nioClient;
            try {
                nioClient = new NIOClient("one", "127.0.0.1", 9999);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            Thread thread = new Thread(() -> {
                try {
                    while (true) {
                        nioClient.receive();
                        Thread.sleep(3000);
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e) {
                    System.out.println("=============== 离线 ===================");
                }
    
            });
            thread.start();;
            System.out.println( "one pleas input : ");
            Scanner scanner = new Scanner(System.in);
            String msg = "";
            while (!(msg = scanner.nextLine()).equals("end")) {
                try {
                    nioClient.sendMsg(msg);
                } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
            thread.interrupt();
        }).start();
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    AIO

    public class NettyServer {
        public static void main(String[] args) {
            NioEventLoopGroup boosGroup = new NioEventLoopGroup();
            NioEventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boosGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new NettyChatServerHandler());
                            }
                        });
                ChannelFuture f = bootstrap.bind(9999).sync();
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boosGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
                System.out.println("关闭");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {
        public static List<Channel> channels = new ArrayList<>();
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channels.add(channel);
            System.out.println(channel.remoteAddress().toString().substring(1) + " online");
        }
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
            Channel channel = ctx.channel();
            for (Channel ch : channels) {
                if (ch != channel) {
                    ch.writeAndFlush("["+ch.remoteAddress().toString().substring(1)+"]"+"said:"+s+"\n");
                }
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channels.remove(channel);
            System.out.println(channel.remoteAddress().toString().substring(1) + " off line");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    public class ChatClient {
        private String host;
        private int port;
        public ChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public void run() {
            NioEventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new NettyClientHandler());
                            }
                        });
                ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1",9999)).sync();
                Channel channel = sync.channel();
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    channel.writeAndFlush(msg + "\\r\\n").sync();
                }
                sync.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println(s.trim());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    JSR303参数校验(2)
    一文带你理解@RefreshScope注解实现动态刷新原理
    【Java】常用的文件操作
    RFSoC应用笔记 - RF数据转换器 -14- RFSoC自动增益控制与NCO跳频功能
    k8s架构浅析
    Vue2项目练手——通用后台管理项目第五节
    Vosviewer的安装与使用
    竞赛 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉
    vue axios请求两种方式,出现401错误,需要添加config配置
    【网络研究院】机器学习系统的威胁是时候该认真对待了
  • 原文地址:https://blog.csdn.net/qq_43259860/article/details/136258909