• 通过netty实现scoket客户端


    客户端发送16进制给服务端,并行实现socket通道活动状态和断开重新连接的功能, 监听接口是否存在数据,如果存在socket客户端发送给socket服务端的实现 随着物联网的发展,随之出现了各种传感器监测数据的实时发送,需要和netty服务器通讯,netty和传感器之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)

        netty 问题总结
        当服务因为心跳反应断开时,在  userEventTriggered 方法中关闭一次后, 会执行 channelInactive 接口,也就是说,如果进行心跳检测,
        但是客户端主动关闭后,只会调用channelInactive 的接口

         在netty 中,主要涉及到几个内容
      服务端 (姑且认为只有一个)
      客户端 (可以有多个)
      channel 通道 (双向通道 )   是入站或者出站的载体。    服务器端的通道和客户端的通道并不是一个通道                  
      channelpipeline 对输入输出的管理
      下面这两个相当于过滤器或者拦截器之类的。   许多类都是从这上面扩展而来
      ChannelInboundHandlerAdapter        输入的过滤
      ChannelOutboundHandlerAdapter    输出的过滤
      主要理解的是通道,一个客户端连接一个服务器,产生一个通道,所有的输入输出都是对通道的操作。

    public class NettyClient {
         private String host;
            private int port;
            /** 存放客户端bootstrap对象 */
            private Bootstrap bootstrap;
            /** 客户端 */
            private EventLoopGroup group;
            /** 存放客户端channel对象 */
            private Channel channel;
            public static void main(String[] args) throws Exception {
                //NettyClient nettyClient = new NettyClient("222.91.99.38", 5100);
                NettyClient nettyClient = new NettyClient("localhost", 5100);
                nettyClient.connect();
            }

            public NettyClient(String host, int port) {
                this.host = host;
                this.port = port;
                init();
            }

            private void init() {
                //客户端需要一个事件循环组
                group = new NioEventLoopGroup();
                //创建客户端启动对象
                // bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
                bootstrap = new Bootstrap();
                bootstrap.group(group)
                        .channel(NioSocketChannel.class)/// 设置nio双向通道
                        .option(ChannelOption.SO_KEEPALIVE, true)
                        .handler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //加入处理器
                                //服务端设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
                                //ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                                
                                //客户端 设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;
                                ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                                /*readerIdleTime:读空闲超时时间
                                writerIdleTime:写空闲超时时间
                                allIdleTime:读和写都空闲的超时时间*/
                       
                                ch.pipeline().addLast(new NettyClientHandler(NettyClient.this));
                            }
                        });
            }
            
          //服务端
           /* ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            //客户端
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);*/
            /**
             * 连接到服务端
             * @throws InterruptedException 中断异常
             */
            public void doConnect() throws InterruptedException {
                log.info("Start connecting server");
                if (channel != null && channel.isActive()) {
                    return;
                }
                bootstrap.connect(host, port).addListener(new ChannelListener(host,port)).sync().channel();
            }
            /**
             * 重新连接
             * @param serverIp 连接的IP
             *
             */
            public void reConnect() {
                try {
                    log.info("Start reconnect to server." + host + ":" + port);
                    if (channel != null && channel.isOpen()) {
                       
                        channel.close();
                    }
                    bootstrap.connect(new InetSocketAddress(host, port))
                    .addListener(new ChannelListener(host,port)).sync().channel();
                } catch (Exception e) {
                    log.info("ReConnect to server failure.server=" + host + ":" + port + ":" + e.getMessage());
                }
            }

            public  void close() {
                if (channel != null && channel.isOpen()) {
                        channel.close();
                }
                bootstrap.clone();
                group.shutdownGracefully();
                
                
            }
            

            public void connect() throws Exception {
                System.out.println("netty client start。。");
                //启动客户端去连接服务器端
                ChannelFuture cf = bootstrap.connect(host, port);
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            //重连交给后端线程执行
                            future.channel().eventLoop().schedule(() -> {
                                System.err.println("重连服务端...");
                                try {
                                    connect();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }, 20, TimeUnit.SECONDS);
                        } else {
                            System.out.println("服务端连接成功...");
                        }
                    }
                });
                //对通道关闭进行监听
                cf.channel().closeFuture().sync();

            }


    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        @Autowired
        private NettyClient nettyClient;

        public NettyClientHandler(NettyClient nettyClient) {
            this.nettyClient = nettyClient;
        }

        /**
         * 当客户端连接服务器完成就会触发该方法
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
           
            
            ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
            System.err.println("当客户端连接服务器完成就会触发该方法。。。");
            String str="48 47 12 00 87 00 16 09 02 11 10 26 21 30 30 30 30 30 30 30 30 30 30 60 57 08 B5 0D 71 8E 74 94 5E 30 29 15 D1 9A D4 AA C2 FD 1E 53 E9 3A E1 CA F5 CF A2 6A 6A 74 2B 8F 87 B6 DF A1 5B CA 05 03 1F 3F 86 CF 8A 0C 05 85 7D 94 65 DE 12 4A A5 E3 EF 46 CD DE C8 13 5F 4C 17 2C 08 41 8F 31 99 44 55 AA B5 A5 A8 25 2A 8D 97 FC 22 76 11 50 02 67 05 30 D0 1D 5B 51 5A A1 11";
            byte[] bytes = hexStringToByteArray(str);
            ByteBuf bufx = Unpooled.copiedBuffer(bytes);
            //UnpooledHeapByteBuf.class
            ctx.writeAndFlush(bufx);
            Channel channel = ctx.channel();
            log.info("---map---"+channel);
            ChannelMap.addChannel("typeid",ctx.channel());
            log.info("客户端连接成功: client address :{}"+ channel.remoteAddress());
            
        }
        public  byte[] hexStringToByteArray(String hexString) {
            hexString = hexString.replaceAll(" ", "");
            int len = hexString.length();
            byte[] bytes = new byte[len / 2];
            for (int i = 0; i < len; i += 2) {
                // 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节
                bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character
                        .digit(hexString.charAt(i + 1), 16));
            }
            return bytes;
        }

        //当通道有读取事件时会触发,即服务端发送数据给客户端
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("--收到服务端的消息--"+msg.toString());

            ByteBuf buf = (ByteBuf) msg;
            byte[] bytes = new byte[buf.readableBytes()];

            // 复制内容到字节数组bytes

            
            buf.readBytes(bytes);
            String bytesToHexString = bytesToHexString(bytes);
            String retx =Arrays.toString(bytes);
            List splitToList_r080928_val = Splitter.on(",").splitToList(retx);
            List result= new ArrayList<>();
            splitToList_r080928_val.forEach(x->{
                 
                 result.add(x);
            });
            System.out.println("--收到服务端的消息bytesToHexString--"+bytesToHexString);
            System.out.println("--收到服务端的消息--"+retx);
            //System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
            // 保存当前连接
            //在服务器端EchoServerHandler中的ChannelRead中保存当前的连接
            ChannelMap.addChannel(bytesToHexString,ctx.channel());
            System.out.println("服务端的地址 map: " + ChannelMap.getChannelHashMap());

        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) 
                throws Exception {
            System.out.println("channelReadComplete: channel读数据完成");
            super.channelReadComplete(ctx);
        }

        /**
         * 将接收到的数据转换为16进制
         * @param bytes
         * @return
         */
        public static String bytesToHexString(byte[] bytes) {
            StringBuilder sb = new StringBuilder();
            List hexList=new ArrayList();
            for (int i = 0; i < bytes.length; i++) {
                String hex = Integer.toHexString(0xFF & bytes[i]);
                if (hex.length() == 1) {
                    hex="0"+hex;
                    hexList.add(hex);
                    sb.append('0');
                }else{
                    hexList.add(hex);
                }
                sb.append(hex);
            }
             String join = Joiner.on(",").skipNulls().join(hexList);
            return join;
        }

        //客户端与服务端断开连接时调用
        // channel 处于不活动状态时调用
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("运行中断开重连。。。");
            nettyClient.connect();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
        
        /*
         * readerIdleTime:读空闲超时时间
    writerIdleTime:写空闲超时时间
    allIdleTime:读和写都空闲的超时时间
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                throws Exception {
            System.err.println("发送心跳,保持长连接。。。");
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.READER_IDLE)) {
                    System.out.println("READER_IDLE");
                } else if (event.state().equals(IdleState.WRITER_IDLE)) {
                    /**发送心跳,保持长连接*/
                    String s = "ping$_";
                    ctx.channel().writeAndFlush(s);
                    System.out.println("心跳发送成功!");
                } else if (event.state().equals(IdleState.ALL_IDLE)) {
                    System.out.println("ALL_IDLE");
                }
            }
            super.userEventTriggered(ctx, evt);
        }

  • 相关阅读:
    C#理论 —— WPF 应用程序&Console 控制台应用
    Supervised Machine Learning Regression and Classification(吴恩达机器学习课程笔记)
    神经网络 深度神经网络,深度神经网络简单介绍
    WordPress(5)在主题中添加文章字数和预计阅读时间
    FPGA底层资源综述
    Windows10神州网信版的USB故障处理(设备描述符请求失败)
    别再低效筛选数据了!试试pandas query函数
    【数据结构初阶】一. 复杂度讲解
    学习Node js:raw-body模块源码解析
    npm install卡住与node-npy的各种奇怪报错
  • 原文地址:https://blog.csdn.net/ruiguang21/article/details/126785520