• 基于Netty模拟大量WebSocket客户端


    基于Netty模拟大量WebSocket客户端

    一、概述

    前段时间需要写一个模拟大量客户端的程序来对服务器做压力测试,选用了Netty做为通信框架,通信协议采用了WebSocket,根据官方下载的源码和Demo写完后发现连接几十个连接后就无法继续获取新的Socket通道了,后来经过各种尝试后发现是EventLoopGroup用法不正确导致的,正确用法是所有的Socket连接共用一个EventLoopGroup就可以正常模拟大量的客户端连接了。

    二、错误用法演示

    文中所有代码主要参照Netty官方源码中netty-netty-4.1.84.Final/example/src/main/java/io/netty/example/http/websocketx/client/

    1、启动类

    public class Simulator {
        public static void start() {
            String serverIp = "127.0.0.1";
            int serverPort = 8005;
            for (int i = 0; i < 10000; i++) {
                WebSocketConnector client = new WebSocketConnector(serverIp,serverPort);
                client.doConnect();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2、WebSocket连接类

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultHttpHeaders;
    import io.netty.handler.codec.http.HttpClientCodec;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
    import io.netty.handler.codec.http.websocketx.WebSocketVersion;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.URI;
    
    /**
     * WebSocket协议类型的模拟客户端连接器类
     *
     * @author duyanjun
     * @since 2022/10/13 杜燕军 新建
     */
    @Slf4j
    public class WebSocketConnector {
        // 服务器ip
        protected String serverIp;
        // 服务器通信端口
        protected int serverSocketPort;
        // 网络通道
        private Channel channel;
    
        /**
         * WebSocket协议类型的模拟客户端连接器构造方法
         *
         * @param serverIp
         * @param serverSocketPort
         */
        public WebSocketConnector(String serverIp,int serverSocketPort) {
            this.serverIp = serverIp;
            this.serverSocketPort = serverSocketPort;
        }
    
        public void doConnect() {
            try {
                String URL = "ws://"+ this.serverIp + ":" + this.serverSocketPort + "/";
                URI uri = new URI(URL);
                final WebSocketIoHandler handler =
                        new WebSocketIoHandler(
                                WebSocketClientHandshakerFactory.newHandshaker(
                                        uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
                EventLoopGroup group = new NioEventLoopGroup();
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                        //.option(ChannelOption.TCP_NODELAY, true)
                        .option(ChannelOption.SO_KEEPALIVE,true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                // 添加一个http的编解码器
                                pipeline.addLast(new HttpClientCodec());
                                // 添加一个用于支持大数据流的支持
                                pipeline.addLast(new ChunkedWriteHandler());
                                // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
                                pipeline.addLast(new HttpObjectAggregator(1024 * 64));
                                pipeline.addLast(handler);
                            }
                        });
                try {
                    synchronized (bootstrap) {
                        final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
                        this.channel = future.channel();
                    }
                } catch (InterruptedException e) {
                    log.error("连接服务失败.......................uri:" + uri.toString(),e);
                }catch (Exception e) {
                    log.error("连接服务失败.......................uri:" + uri.toString(),e);
                }
            } catch (Exception e) {
                log.error("连接服务失败.......................",e);
            } finally {
            }
        }
    
        public void disConnect() {
            this.channel.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 问题代码主要在上述代码中 EventLoopGroup group = new NioEventLoopGroup() 这样使用后就导致每个WebSocket连接会创建一个新的EventLoopGroup对象,导致无法创建大量的客户端连接;
    • 正确用法应该是将EventLoopGroup提到WebSocketConnector类的上层,由WebSocketConnector对象的创建者维护一个公共的EventLoopGroup对象,所有WebSocketConnector对象共享一个EventLoopGroup对象;

    3、IO数据处理类

    
    import io.netty.channel.*;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.websocketx.*;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.util.CharsetUtil;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * WebSocket协议类型的模拟客户端IO处理器类
     *
     * @author duyanjun
     * @since 2022/10/13 杜燕军 新建
     */
    @Slf4j
    public class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {
    
        private final WebSocketClientHandshaker handShaker;
    
        private ChannelPromise handshakeFuture;
    
        public WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
            this.handShaker = handShaker;
        }
    
        public ChannelFuture handshakeFuture() {
            return handshakeFuture;
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            handshakeFuture = ctx.newPromise();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            handShaker.handshake(ctx.channel());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            ctx.close();
            try {
                super.channelInactive(ctx);
            } catch (Exception e) {
                log.error("channelInactive 异常.", e);
            }
            log.warn("WebSocket链路与服务器连接已断开.");
        }
    
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel ch = ctx.channel();
            if (!handShaker.isHandshakeComplete()) {
                try {
                    handShaker.finishHandshake(ch, (FullHttpResponse) msg);
                    handshakeFuture.setSuccess();
                    log.info("WebSocket握手成功,可以传输数据了.");
                    // 数据一定要封装成WebSocketFrame才能发达
                    String data = "Hello";
                    WebSocketFrame frame = new TextWebSocketFrame(data);
                    ch.writeAndFlush(frame);
                } catch (WebSocketHandshakeException e) {
                    log.warn("WebSocket Client failed to connect");
                    handshakeFuture.setFailure(e);
                }
                return;
            }
    
            if (msg instanceof FullHttpResponse) {
                FullHttpResponse response = (FullHttpResponse) msg;
                throw new IllegalStateException(
                        "Unexpected FullHttpResponse (getStatus=" + response.status() +
                                ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
            }
    
            WebSocketFrame frame = (WebSocketFrame) msg;
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
                String s = textFrame.text();
                log.info("WebSocket Client received message: " + s);
            } else if (frame instanceof PongWebSocketFrame) {
                log.info("WebSocket Client received pong");
            } else if (frame instanceof CloseWebSocketFrame) {
                log.info("WebSocket Client received closing");
                ch.close();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause);
            if (!handshakeFuture.isDone()) {
                handshakeFuture.setFailure(cause);
            }
            ctx.close();
            super.exceptionCaught(ctx, cause);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                // 如果写通道处于空闲状态,就发送心跳命令
                if (IdleState.WRITER_IDLE.equals(event.state()) || IdleState.READER_IDLE.equals(event.state())) {
                    // 发送心跳数据
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113

    三、正确用法演示

    1、启动类

    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    
    public class Simulator {
        public static void start() {
            String serverIp = "127.0.0.1";
            int serverPort = 8005;
            EventLoopGroup group = new NioEventLoopGroup();
            for (int i = 0; i < 10000; i++) {
                WebSocketConnector client = new WebSocketConnector(serverIp,serverPort,group);
                client.doConnect();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2、WebSocket连接类

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.DefaultHttpHeaders;
    import io.netty.handler.codec.http.HttpClientCodec;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
    import io.netty.handler.codec.http.websocketx.WebSocketVersion;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.URI;
    
    /**
     * WebSocket协议类型的模拟客户端连接器类
     *
     * @author duyanjun
     * @since 2022/10/13 杜燕军 新建
     */
    @Slf4j
    public class WebSocketConnector {
        // 服务器ip
        protected String serverIp;
        // 服务器通信端口
        protected int serverSocketPort;
        // 事件循环线程池
        protected EventLoopGroup group;
        // 网络通道
        private Channel channel;
    
        /**
         * WebSocket协议类型的模拟客户端连接器构造方法
         *
         * @param serverIp
         * @param serverSocketPort
         * @param group
         */
        public WebSocketConnector(String serverIp,int serverSocketPort,EventLoopGroup group) {
            this.serverIp = serverIp;
            this.serverSocketPort = serverSocketPort;
            this.group = group;
        }
    
        public void doConnect() {
            try {
                String URL = "ws://"+ this.serverIp + ":" + this.serverSocketPort + "/";
                URI uri = new URI(URL);
                final WebSocketIoHandler handler =
                        new WebSocketIoHandler(
                                WebSocketClientHandshakerFactory.newHandshaker(
                                        uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group).channel(NioSocketChannel.class)
                        //.option(ChannelOption.TCP_NODELAY, true)
                        .option(ChannelOption.SO_KEEPALIVE,true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                // 添加一个http的编解码器
                                pipeline.addLast(new HttpClientCodec());
                                // 添加一个用于支持大数据流的支持
                                pipeline.addLast(new ChunkedWriteHandler());
                                // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
                                pipeline.addLast(new HttpObjectAggregator(1024 * 64));
                                pipeline.addLast(handler);
                            }
                        });
                try {
                    synchronized (bootstrap) {
                        final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
                        this.channel = future.channel();
                    }
                } catch (InterruptedException e) {
                    log.error("连接服务失败.......................uri:" + uri.toString(),e);
                }catch (Exception e) {
                    log.error("连接服务失败.......................uri:" + uri.toString(),e);
                }
            } catch (Exception e) {
                log.error("连接服务失败.......................",e);
            } finally {
            }
    
        }
    
        public void disConnect() {
            this.channel.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90

    3、IO数据处理类

    // Io数据处理类WebSocketIoHandler.java没有变化,使用二、错误用法演示->3、IO数据处理类中所示代码即可
    
    • 1
  • 相关阅读:
    SpringCloud01
    华为机试真题 C++ 实现【日志首次上报最多积分】【2022.11 Q4 新题】
    逻辑功能的几种基本描述方法
    Java多线程之常用的相关方法总结(线程停止、线程休眠、线程礼让、线程优先级、守护线程等等)
    八月份记录
    二叉树的学习
    如何杜绝聊天泄密事件的发生呢(企业如何管理通讯工具,防止员工聊天泄密)
    js中的基础知识点 —— 事件
    软件测试---场景法(功能测试)
    postman返回值乱码
  • 原文地址:https://blog.csdn.net/dyj095/article/details/127714472