• java-使用netty实现客户端向服务端存储数据的简单实现


    可以使用write:1,实现想服务端存储1的数据,使用read命令可以读取刚才的写入的数据,效果如下
    服务端

    read
    read is inputs
    请输入消息:
    客户端读取服务返回的数据:1
    客户端读取服务返回的数据:服务端已经读取消息完成!read
    write:999
    write:999 is inputs
    请输入消息:
    客户端读取服务返回的数据:服务端已经读取消息完成!write:999
    read
    read is inputs
    请输入消息:
    客户端读取服务返回的数据:999
    客户端读取服务返回的数据:服务端已经读取消息完成!read
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    package sample.netty.newtest;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    
    public class nettyServerTest {
    
        private int port;
    
        public nettyServerTest(int port) {
            this.port = port;
        }
    
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class) // (3)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                channel.pipeline().addLast(new StringDecoder());
                                channel.pipeline().addLast(new NettyServerHandler());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                        .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                // Wait until the server socket is closed.
                // In this example, this does not happen, but you can do that to gracefully
                // shut down your server.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        public class NettyServerHandler extends ChannelInboundHandlerAdapter { // (1)
            int currentValue=0;
            String pubMSG;
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { // (2)
                String byteBuf = (String) msg;
                pubMSG=byteBuf;
                System.out.println("客户端发来的消息是:" + byteBuf);
    
                if(byteBuf.contains("write:")){
                    currentValue=Integer.parseInt(byteBuf.split(":")[1]);
                }
                if(byteBuf.contains("read")){
                    ctx.writeAndFlush(Unpooled.copiedBuffer(currentValue+"\r\n", CharsetUtil.UTF_8));
                }
            }
    
            //数据读取完毕事件
            public void channelReadComplete(ChannelHandlerContext ctx) throws IOException, InterruptedException {
                //数据读取完毕,将信息包装成一个Buffer传递给下一个Handler,Unpooled.copiedBuffer会返回一个Buffer
                //调用的是事件处理器的上下文对象的writeAndFlush方法
                //意思就是说将  你好  传递给了下一个handler
                ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已经读取消息完成!"+pubMSG+"\r\n", CharsetUtil.UTF_8));
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
                // Close the connection when an exception is raised.
                cause.printStackTrace();
                ctx.close();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 1003;
    //        if (args.length > 0) {
    //            port = Integer.parseInt(args[0]);
    //        }
            new nettyServerTest(port).run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    客户端
    package sample.netty.newtest;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.util.CharsetUtil;

    import java.io.IOException;
    import java.util.Scanner;

    public class nettyClient {
    ChannelHandlerContext ctxOut;
    public static void main(String[] args) throws Exception {
    NettyClientHandlerInner nettyClientHandlerInner=new NettyClientHandlerInner();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    Scanner scanner=new Scanner(System.in);
                    System.out.println("请输入消息:");
                    String inputs=scanner.nextLine();
                    System.out.println(inputs+" is inputs");
                    nettyClientHandlerInner.sendMSG(inputs);
                }
            }
        }).start();
    
        connectToServer(nettyClientHandlerInner);
        System.out.println(222222);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    // while (true){
    // Scanner scanner=new Scanner(System.in);
    // String inputs=scanner.nextLine();
    // nettyClientHandlerInner.ctxOut.writeAndFlush(Unpooled.copiedBuffer(inputs+“\r\n”, CharsetUtil.UTF_8));;
    // }

    }
    
    public static void connectToServer(NettyClientHandlerInner nettyClientHandler){
        EventLoopGroup workerGroup = new NioEventLoopGroup();
    
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(nettyClientHandler);
                }
            });
    
            // Start the client.
            ChannelFuture f = b.connect("127.0.0.1", 1003).sync(); // (5)
    
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    }

    //客户端业务处理类
    class NettyClientHandlerInner extends ChannelInboundHandlerAdapter {
    ChannelHandlerContext ctxOut;
    //通道就绪事件(就是在bootstrap启动助手配置中addlast了handler之后就会触发此事件)
    //但我觉得也可能是当有客户端连接上后才为一次通道就绪
    public void channelActive(ChannelHandlerContext ctx) throws IOException, InterruptedException {
    System.out.println(“客户端消息,通道激活,可以发送消息了”);
    ctxOut=ctx;
    }

    //数据读取事件
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //传来的消息包装成字节缓冲区
        String byteBuf = (String) msg;
    
    • 1
    • 2
    • 3
    • 4

    // ByteBuf byteBuf = (ByteBuf) msg;
    //Netty提供了字节缓冲区的toString方法,并且可以设置参数为编码格式:CharsetUtil.UTF_8
    System.out.println(“客户端读取服务返回的数据:” + byteBuf);
    }

    public void  sendMSG(String msg){
        ctxOut.writeAndFlush(Unpooled.copiedBuffer(msg+"\r\n", CharsetUtil.UTF_8));
    }
    
    • 1
    • 2
    • 3

    }

  • 相关阅读:
    Vue3 学习
    2022/11/22 [easyx]关于字符和一些背景
    SpringCloud的nacos多项目、多环境的命名空间和分组配置
    大数据Apache Druid(五):Druid批量数据加载
    【Kafka】Kafka生产者数据重复、数据有序、数据乱序-07
    阿里云部署SpringBoot项目启动后被杀进程的问题
    【Python机器学习】零基础掌握VotingClassifier集成学习
    科普系列:AUTOSAR与OSEK网络管理比较(上)
    因为做了这样的项目,成为了offer收割机!
    [javaweb]jsp,jstl,el表达式的使用
  • 原文地址:https://blog.csdn.net/m0_60688978/article/details/134550000