• Netty优化与源码


    黑马程序员Netty笔记合集
    注意:由于章节连贯,此套笔记更适合学习《黑马Netty全套课程》的同学参考、复习使用。

    文章名链接
    Java NIO入门:结合尚硅谷课程文章地址
    Netty 入门文章地址
    Netty进阶文章地址 | 粘包、半包
    Netty优化与源码文章地址 | 源码分析

    一、优化

    1.1 扩展序列化算法

    序列化,反序列化主要用在消息正文的转换上

    • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
    • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

    消息编解码器 MessageCodeSharable 原序列化和反序列化机制

    // 反序列化
    byte[] body = new byte[bodyLength];
    byteByf.readBytes(body);
    ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
    Message message = (Message) in.readObject();
    message.setSequenceId(sequenceId);
    
    // 序列化
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    new ObjectOutputStream(out).writeObject(message);
    byte[] bytes = out.toByteArray();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    为了支持更多序列化算法,抽象一个 Serializer 接口

    public interface Serializer {
        //反序列化
        <T> T deserializer(Class<T> clazz,byte[] bytes);
    
        //序列化
        <T> byte[] serializer(T object);
    
        enum SerializerAlgorithm implements Serializer{
            Java{
                @Override
                public <T> T deserializer(Class<T> clazz, byte[] bytes) {
                    try {
                        ObjectInputStream ois=new ObjectInputStream(new ByteArrayInputStream(bytes));
                        return (T) ois.readObject();
                    } catch (IOException | ClassNotFoundException e) {
                        throw new RuntimeException("反序列化失败!",e);
                    }
                }
    
                @Override
                public <T> byte[] serializer(T object) {
                    try {
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        ObjectOutputStream oos=new ObjectOutputStream(baos);
                        oos.writeObject(object);
                        return baos.toByteArray();
                    } catch (IOException e) {
                        throw new RuntimeException("序列化失败!",e);
                    }
                }
            },
            Json{
                @Override
                public <T> T deserializer(Class<T> clazz, byte[] bytes) {
                    return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8),clazz);
                }
    
                @Override
                public <T> byte[] serializer(T object) {
                    return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
                }
            }
            // 需要从协议的字节中得到是哪种序列化算法
        	public static SerializerAlgorithm getByInt(int type) {
            	SerializerAlgorithm[] array = SerializerAlgorithm.values();
            	if (type < 0 || type > array.length - 1) {
                	throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
            	}
            	return array[type];
        	}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    增加配置类和配置文件

    public abstract class Config {
        static Properties properties;
        static {
            try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
                properties = new Properties();
                properties.load(in);
            } catch (IOException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
        public static int getServerPort() {
            String value = properties.getProperty("server.port");
            if(value == null) {
                return 8080;
            } else {
                return Integer.parseInt(value);
            }
        }
        //获取配置文件中配置的序列化算法
        public static Serializer.SerializerAlgorithm getSerializerAlgorithm() {
            String value = properties.getProperty("serializer.algorithm");
            if(value == null) {
                return Serializer.SerializerAlgorithm.Java;
            } else {
                return Serializer.SerializerAlgorithm.valueOf(value);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    配置文件:application.properties

    serializer.algorithm=Json
    
    • 1

    修改编解码器

    /**
     * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
     */
    @Slf4j
    @ChannelHandler.Sharable
    public class MessageCodeSharable extends MessageToMessageCodec<ByteBuf, Message> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {
            ByteBuf byteBuf = ctx.alloc().buffer();
            //1.魔数:4个字节
            byteBuf.writeBytes(new byte[]{1,2,3,4});
            //2.版本号:1个字节
            byteBuf.writeByte(1);
            //3.序列化算法:1个字节,0 jdk ,1 json
            //获取配置文件中指定序的列化算法在序列化枚举数组中的索引下标
            byteBuf.writeByte(Config.getSerializerAlgorithm().ordinal());
            //4.消息类型:1个字节,由消息本身决定
            byteBuf.writeByte(msg.getMessageType());
            //5.请求序号:4个字节,由消息本身自带
            byteBuf.writeInt(msg.getSequenceId());
            byteBuf.writeByte(-1); //填充:无意义字节
            //使用指定的序列化算法转换正文内容
            byte[] bytes = Config.getSerializerAlgorithm().serializer(msg);
            //6.正文长度:4个字节
            byteBuf.writeInt(bytes.length);
            //7.消息正文:
            byteBuf.writeBytes(bytes);
            list.add(byteBuf);
        }
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            //1.魔数:4个字节
            int magicNum = byteBuf.readInt();
            //2.版本号:1个字节
            byte version=byteBuf.readByte();
            //3.序列化算法:0 jdk ,1 json
            byte serializerType=byteBuf.readByte();
            //4.指令类型:由消息本身决定
            byte messageType=byteBuf.readByte();
            //5.请求序号:由消息本身自带
            int sequenceId=byteBuf.readInt();
            byteBuf.readByte();
            //6.正文长度
            int length=byteBuf.readInt();
            //7.消息正文
            byte[] bytes=new byte[length];
            byteBuf.readBytes(bytes,0,length);
            //获取序列化算法
            Serializer.SerializerAlgorithm serializerAlgorithm = Config.getSerializerAlgorithm();
            //通过获取 Message 实现类对象
            Class<? extends Message> messageClass = Message.getMessageClass(messageType);
            //通过指定的算法将字节数组转换为 对应的Message实现类 对象
            Object message = Config.getSerializerAlgorithm().deserializer(messageClass, bytes);
            log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
            log.debug("{}", message);
            list.add(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    消息实现类的获取

    @Data
    public abstract class Message implements Serializable {
    
        /**
         * 根据消息类型字节,获得对应的消息 class
         * @param messageType 消息类型字节
         * @return 消息 class
         */
        public static Class<? extends Message> getMessageClass(int messageType) {
            return messageClasses.get(messageType);
        }
    
        private int sequenceId;
    
        private int messageType;
    
        public abstract int getMessageType();
    
        public static final int LoginRequestMessage = 0;
        public static final int LoginResponseMessage = 1;
        public static final int ChatRequestMessage = 2;
        public static final int ChatResponseMessage = 3;
        public static final int GroupCreateRequestMessage = 4;
        public static final int GroupCreateResponseMessage = 5;
        public static final int GroupJoinRequestMessage = 6;
        public static final int GroupJoinResponseMessage = 7;
        public static final int GroupQuitRequestMessage = 8;
        public static final int GroupQuitResponseMessage = 9;
        public static final int GroupChatRequestMessage = 10;
        public static final int GroupChatResponseMessage = 11;
        public static final int GroupMembersRequestMessage = 12;
        public static final int GroupMembersResponseMessage = 13;
        public static final int PingMessage = 14;
        public static final int PongMessage = 15;
        /**
         * 请求类型 byte 值
         */
        public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
        /**
         * 响应类型 byte 值
         */
        public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;
    
        private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
    
        static {
            messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
            messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
            messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
            messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
            messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
            messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
            messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
            messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
            messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
            messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
            messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
            messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
            messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
            messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    测试

    /**
     * 出站
     */
    @Test
    public void test(){
        LoggingHandler LOGGING = new LoggingHandler();
        MessageCodeSharable CODEC = new MessageCodeSharable();
        EmbeddedChannel channel=new EmbeddedChannel(LOGGING,CODEC,LOGGING);
        channel.writeOutbound(new ChatRequestMessage("zhangsan","lisi","Hello"));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • Java

      10:14:09 [DEBUG] [main] i.n.h.l.LoggingHandler : [id: 0xembedded, L:embedded - R:embedded] WRITE: ChatRequestMessage(super=Message(sequenceId=0, messageType=2), content=Hello, to=lisi, from=zhangsan)
      10:14:09 [DEBUG] [main] i.n.h.l.LoggingHandler : [id: 0xembedded, L:embedded - R:embedded] WRITE: 259B
               +-------------------------------------------------+
               |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
      +--------+-------------------------------------------------+----------------+
      |00000000| 01 02 03 04 01 00 02 00 00 00 00 ff 00 00 00 f3 |................|
      |00000010| ac ed 00 05 73 72 00 34 6f 72 67 2e 65 78 61 6d |....sr.4org.exam|
      |00000020| 70 6c 65 2e 6a 61 76 61 2e 63 68 61 74 52 6f 6f |ple.java.chatRoo|
      |00000030| 6d 2e 6d 65 73 73 61 67 65 2e 43 68 61 74 52 65 |m.message.ChatRe|
      |00000040| 71 75 65 73 74 4d 65 73 73 61 67 65 78 13 15 e1 |questMessagex...|
      |00000050| fe 0b 6d 55 02 00 03 4c 00 07 63 6f 6e 74 65 6e |..mU...L..conten|
      |00000060| 74 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 2f 53 |tt..Ljava/lang/S|
      |00000070| 74 72 69 6e 67 3b 4c 00 04 66 72 6f 6d 71 00 7e |tring;L..fromq.~|
      |00000080| 00 01 4c 00 02 74 6f 71 00 7e 00 01 78 72 00 29 |..L..toq.~..xr.)|
      |00000090| 6f 72 67 2e 65 78 61 6d 70 6c 65 2e 6a 61 76 61 |org.example.java|
      |000000a0| 2e 63 68 61 74 52 6f 6f 6d 2e 6d 65 73 73 61 67 |.chatRoom.messag|
      |000000b0| 65 2e 4d 65 73 73 61 67 65 06 07 90 d6 50 f8 11 |e.Message....P..|
      |000000c0| fa 02 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 |....I..messageTy|
      |000000d0| 70 65 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 |peI..sequenceIdx|
      |000000e0| 70 00 00 00 00 00 00 00 00 74 00 05 48 65 6c 6c |p........t..Hell|
      |000000f0| 6f 74 00 08 7a 68 61 6e 67 73 61 6e 74 00 04 6c |ot..zhangsant..l|
      |00000100| 69 73 69                                        |isi             |
      +--------+-------------------------------------------------+----------------+
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
    • Json

      10:15:17 [DEBUG] [main] i.n.h.l.LoggingHandler : [id: 0xembedded, L:embedded - R:embedded] WRITE: ChatRequestMessage(super=Message(sequenceId=0, messageType=2), content=Hello, to=lisi, from=zhangsan)
      10:15:18 [DEBUG] [main] i.n.h.l.LoggingHandler : [id: 0xembedded, L:embedded - R:embedded] WRITE: 96B
               +-------------------------------------------------+
               |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
      +--------+-------------------------------------------------+----------------+
      |00000000| 01 02 03 04 01 01 02 00 00 00 00 ff 00 00 00 50 |...............P|
      |00000010| 7b 22 63 6f 6e 74 65 6e 74 22 3a 22 48 65 6c 6c |{"content":"Hell|
      |00000020| 6f 22 2c 22 74 6f 22 3a 22 6c 69 73 69 22 2c 22 |o","to":"lisi","|
      |00000030| 66 72 6f 6d 22 3a 22 7a 68 61 6e 67 73 61 6e 22 |from":"zhangsan"|
      |00000040| 2c 22 73 65 71 75 65 6e 63 65 49 64 22 3a 30 2c |,"sequenceId":0,|
      |00000050| 22 6d 65 73 73 61 67 65 54 79 70 65 22 3a 30 7d |"messageType":0}|
      +--------+-------------------------------------------------+----------------+
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

    1.2 参数调优

    1)CONNECT_TIMEOUT_MILLIS

    • 属于 SocketChannal 参数

    • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常

    • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

    @Slf4j
    public class TestConnectionTimeout {
        public static void main(String[] args) {
            NioEventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap()
                        .group(group)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
                        .channel(NioSocketChannel.class)
                        .handler(new LoggingHandler());
                ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
                future.sync().channel().closeFuture().sync(); // 断点1
            } catch (Exception e) {
                e.printStackTrace();
                 log.debug("timeout");
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    超时异常

    io.netty.channel.ConnectTimeoutException: connection timed out: localhost/127.0.0.1:8080
    	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:263)
    	at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
    	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.lang.Thread.run(Thread.java:750)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

    /**
     * 1.线程间使用 Promise 进行通信
     * 2.根据超时时间设置 定时抛异常任务 。如果定时任务未被取消,则说明连接超时
     */
    @Override
    public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        // ...
        // Schedule connect timeout.
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        if (connectTimeoutMillis > 0) {
            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                @Override
                public void run() {                
                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                    ConnectTimeoutException cause =
                        new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        close(voidPromise());
                    }
                }
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    2)SO_BACKLOG

    • 属于 ServerSocketChannal 参数
    client server syns queue accept queue bind() listen() connect() 1. SYN SYN_SEND put SYN_RCVD 2. SYN + ACK ESTABLISHED 3. ACK put ESTABLISHED accept() client server syns queue accept queue
    1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
    2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
    3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

    其中

    • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

    • sync queue - 半连接队列

      • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
    • accept queue - 全连接队列

      • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
      • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

    netty 中控制全连接

    可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

    测试

    • 调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
    /**
     * 服务器
     */
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(2);
        try {
            Channel channel = new ServerBootstrap()
                    .channel(NioServerSocketChannel.class)
                    //设置全连接队列大小:2
                    .option(ChannelOption.SO_BACKLOG,2)
                    .group(group)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new LoggingHandler());
                        }
                    })
                    .bind().sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    /**
     * 客户端
     */
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(2);
        try {
            Channel channel = new Bootstrap()
                    .channel(NioSocketChannel.class)
                    .group(group)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new LoggingHandler());
                        }
                    })
                    .connect(new InetSocketAddress("127.0.0.1", 8080)).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这里插入图片描述

    3)ulimit -n 数字

    • 属于操作系统参数
    • 属于临时调整,建议放在启动脚本里
    • 限制一个进程能够同时打开的文件描述符的数量

    4)TCP_NODELAY

    • 属于 SocketChannal 参数
    • Nagle 算法设置,默认为 true

    5)SO_SNDBUF & SO_RCVBUF

    • SO_SNDBUF 属于 SocketChannal 参数
    • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
    • 现在的操作系统比较智能,会自动调整。不建议设置

    6)ALLOCATOR

    • 属于 SocketChannal 参数。ByteBuf 分配器
    • 用来分配 ByteBuf, ctx.alloc()

    7)RCVBUF_ALLOCATOR

    • 属于 SocketChannal 参数
    • 控制 netty 接收缓冲区大小
    • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

    1.3 RPC 框架

    1)准备工作

    这些代码可以认为是现成的,无需从头编写练习

    为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

    增加 Rpc 消息类型:Message 接口中增加

    @Data
    public abstract class Message implements Serializable {
    
        // 省略旧的代码
    
        public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
        public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;
    
        static {
            // ...
            messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Rpc请求消息:RpcRequestMessage

    @Getter
    @ToString(callSuper = true)
    public class RpcRequestMessage extends Message {
    
        /**
         * 调用的接口全限定名,服务端根据它找到实现
         */
        private String interfaceName;
        /**
         * 调用接口中的方法名
         */
        private String methodName;
        /**
         * 方法返回类型
         */
        private Class<?> returnType;
        /**
         * 方法参数类型数组
         */
        private Class[] parameterTypes;
        /**
         * 方法参数值数组
         */
        private Object[] parameterValue;
    
        public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
            super.setSequenceId(sequenceId);
            this.interfaceName = interfaceName;
            this.methodName = methodName;
            this.returnType = returnType;
            this.parameterTypes = parameterTypes;
            this.parameterValue = parameterValue;
        }
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_REQUEST;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    Rpc响应消息:RpcResponseMessage

    @Data
    @ToString(callSuper = true)
    public class RpcResponseMessage extends Message {
        /**
         * 返回值
         */
        private Object returnValue;
        /**
         * 异常值
         */
        private Exception exceptionValue;
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_RESPONSE;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    服务器架子

    @Slf4j
    public class RpcServer {
        public static void main(String[] args) {
            NioEventLoopGroup Boss=new NioEventLoopGroup(1);
            NioEventLoopGroup Worker=new NioEventLoopGroup(2);
            LoggingHandler LOGGING_HANDLER=new LoggingHandler();
            MessageCodeSharable MESSAGE_CODEC=new MessageCodeSharable();
            
            //1.RPC消息发送处理器
            RpcRequestMessageHandler RPC_REQUEST_HANDLER=new RpcRequestMessageHandler();
            try {
                ServerBootstrap serverBootstrap=new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(Boss,Worker);
                serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder()); //处理粘包半包
                        ch.pipeline().addLast(LOGGING_HANDLER); //记录日志
                        ch.pipeline().addLast(MESSAGE_CODEC); //消息协议 编码、解码
    
                        //1.RPC消息发送处理器
                        ch.pipeline().addLast(RPC_REQUEST_HANDLER);
                    }
                });
                Channel channel = serverBootstrap.bind(8080).sync().channel();
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                log.debug("Server error,{}",e);
            } finally {
                Boss.shutdownGracefully();
                Worker.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    客户端架子

    @Slf4j
    public class RpcClient {
        public static void main(String[] args) {
            NioEventLoopGroup group=new NioEventLoopGroup(2);
            LoggingHandler LOGGING_HANDLER=new LoggingHandler();
            MessageCodeSharable MESSAGE_CODEC=new MessageCodeSharable();
    
            //1.RPC消息响应处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER=new RpcResponseMessageHandler();
            try {
                Bootstrap bootstrap=new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(group);
                bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder()); //处理粘包半包
                        ch.pipeline().addLast(LOGGING_HANDLER); //记录日志
                        ch.pipeline().addLast(MESSAGE_CODEC); //消息协议 编码、解码
    
                        //1.RPC消息响应处理器
                        ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                    }
                });
                Channel channel = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync().channel();
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                log.debug("Client error,{}",e);
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    相关服务

    • HelloService

      public interface HelloService {
          String sayHello(String msg);
      }
      
      • 1
      • 2
      • 3
    • HelloServiceImpl

      //有所修改
      public class HelloServiceImpl implements HelloService{
          @Override
          public String sayHello(String msg) {
              System.out.println("你好!"+msg);
              return "你好!"+msg;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • ServiceFactory:通过 Class对象 获取 该类的实例对象

      public abstract class ServiceFactory {
          static Properties properties;
          static Map<Class<?>,Object> map =new ConcurrentHashMap<>();
      
          static{
              try (InputStream in = Config.class.getResourceAsStream("/application.properties")){
                  properties =new Properties();
                  properties.load(in);
                  Set<String> names =properties.stringPropertyNames();
                  for (String name :names) {
                      if (name.endsWith("Service")) {
                          Class<?> interfaceclass =Class.forName(name);
                          Class<?> instanceclass =Class.forName(properties.getProperty(name));
                          map.put(interfaceclass, instanceclass.newInstance());
                      }
                  }
              }catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e){
                  throw new ExceptionInInitializerError(e);
              }
          }
      
          public static <T> T getService(Class<T> interfaceClass){
              return (T)map.get(interfaceClass);
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    • application.properties

      serializer.algorithm=Json
      org.example.java.chatRoom.server.service.HelloService=org.example.java.chatRoom.server.service.HelloServiceImpl
      
      • 1
      • 2

    2)客户端发送 rpc请求

    @Slf4j
    public class RpcClient {
        public static void main(String[] args) {
            NioEventLoopGroup group=new NioEventLoopGroup(2);
            LoggingHandler LOGGING_HANDLER=new LoggingHandler();
            MessageCodeSharable MESSAGE_CODEC=new MessageCodeSharable();
    
            //2.RPC消息响应处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER=new RpcResponseMessageHandler();
            try {
                Bootstrap bootstrap=new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(group);
                bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder()); //处理粘包半包
                        ch.pipeline().addLast(LOGGING_HANDLER); //记录日志
                        ch.pipeline().addLast(MESSAGE_CODEC); //消息协议 编码、解码
    
                        //2.RPC消息响应处理器
                        ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                    }
                });
                Channel channel = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync().channel();
                //1.连接完成后发送 Rpc请求消息
                ChannelFuture channelFuture = channel.writeAndFlush(new RpcRequestMessage(
                        0,
                        "org.example.java.chatRoom.server.service.HelloService",
                        "sayHello",
                        String.class,
                        new Class[]{String.class},
                        new Object[]{"张三"}));
                channelFuture.addListener(promise->{
                    if(!promise.isSuccess()){
                        Throwable cause=promise.cause();
                        log.debug("error,{}",cause);
                    }
                });
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                log.debug("Client error,{}",e);
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    在这里插入图片描述

    3)服务器接收并响应

    @Slf4j
    @ChannelHandler.Sharable
    public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception {
            RpcResponseMessage response = new RpcResponseMessage();
            response.setSequenceId(message.getSequenceId());
            try {
                //1.获取实现对象
                String interfaceName = message.getInterfaceName();
                HelloService service = (HelloService) ServiceFactory.getService(Class.forName(interfaceName));
                //2.调用方法
                String methodName = message.getMethodName();
                Class[] parameterTypes = message.getParameterTypes();
                Object[] parameterValue = message.getParameterValue();
                Method method = service.getClass().getMethod(methodName, parameterTypes);
                Object result = method.invoke(service, parameterValue);
                //3.设置返回值
                response.setReturnValue(result);
            } catch (ClassNotFoundException e) {
                log.debug("RpcRequestHandler erro,{}",e);
                response.setExceptionValue(e);
            } finally {
            }
            ctx.writeAndFlush(response);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这里插入图片描述

    4)客户端简单处理响应

    @Slf4j
    @ChannelHandler.Sharable
    public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage message) throws Exception {
            log.debug("{}", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    5)客户端2.0:管理Channel

    包括 channel 管理,代理,接收结果

    @Slf4j
    public class RpcClientManager {
        public static void main(String[] args) {
            HelloService service=getProxyService(HelloService.class);
            //方法调用暂时没有考虑返回值
            service.sayHello("小红");
        }
    
        //创建代理类对象:代理调用服务端的方法
        public static <T> T getProxyService(Class<T> serviceClass){
            //类加载器
            ClassLoader classLoader = serviceClass.getClassLoader();
            //代理类的接口数组
            Class<?>[] classes = {serviceClass};
            //方法处理器
            Object o= Proxy.newProxyInstance(classLoader,classes,(proxy, method, args)->{
                //1.生成 rpc消息对象
                int serquenceId= SequenceIdGenerator.nextId();
                RpcRequestMessage message = new RpcRequestMessage(
                        serquenceId,
                        serviceClass.getName(),
                        method.getName(),
                        method.getReturnType(),
                        method.getParameterTypes(),
                        args);
                //2.发送消息
                getChannel().writeAndFlush(message);
                //3.暂时返回null
                return null;
            });
            return (T) o;
        }
    
    
        private static Channel channel=null;
        //获取单例channel
        public static Channel getChannel(){
            if(channel!=null) return channel;
            synchronized (RpcClientManager.class){
                if (channel!=null) return channel;
                initChannel();
                return channel;
            }
        }
        //初始化Channel
        private static void initChannel() {
            NioEventLoopGroup group=new NioEventLoopGroup(2);
            LoggingHandler LOGGING_HANDLER=new LoggingHandler();
            MessageCodeSharable MESSAGE_CODEC=new MessageCodeSharable();
    
            //RPC消息响应处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER=new RpcResponseMessageHandler();
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder()); //处理粘包半包
                    ch.pipeline().addLast(LOGGING_HANDLER); //记录日志
                    ch.pipeline().addLast(MESSAGE_CODEC); //消息协议 编码、解码
    
                    //RPC消息响应处理器
                    ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                }
            });
            try {
                channel = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync().channel();
                channel.closeFuture().addListener(future -> {
                    group.shutdownGracefully();
                });
            } catch (InterruptedException e) {
                log.debug("Client error,{}",e);
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • SequenceIdGenerator

      public class SequenceIdGenerator {
          private static final AtomicInteger id=new AtomicInteger();
          public static int nextId(){
              return id.incrementAndGet();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    在这里插入图片描述

    6)客户端3.0:等待响应

    发送 rpc请求 的线程等待响应结果

    @Slf4j
    public class RpcClientManager {
        public static void main(String[] args) {
            HelloService service=getProxyService(HelloService.class);
            System.out.println(service.sayHello("小红"));
            System.out.println(service.sayHello("小明"));
            System.out.println(service.sayHello("小紫"));
        }
    
        //创建代理类对象
        public static <T> T getProxyService(Class<T> serviceClass){
            //类加载器
            ClassLoader classLoader = serviceClass.getClassLoader();
            //代理类的接口数组
            Class<?>[] classes = {serviceClass};
            //方法处理器
            Object o= Proxy.newProxyInstance(classLoader,classes,(proxy, method, args)->{
                //1.生成 rpc消息对象
                int serquenceId= SequenceIdGenerator.nextId();
                RpcRequestMessage message = new RpcRequestMessage(
                        serquenceId,
                        serviceClass.getName(),
                        method.getName(),
                        method.getReturnType(),
                        method.getParameterTypes(),
                        args);
                //2.发送消息
                getChannel().writeAndFlush(message);
                //3.创建 promise容器 接收返回消息:并指定 promise对象 异步接收结果线程
                DefaultPromise<Object> promise=new DefaultPromise<>(getChannel().eventLoop());
                RpcResponseMessageHandler.PROMISES.put(serquenceId,promise);
                //4.等待结果返回
                promise.await();
                //5.返回结果
                if (promise.isSuccess()) {
                    return promise.getNow();
                }else {
                    throw new RuntimeException(promise.cause());
                }
            });
            return (T) o;
        }
    
    
        private static Channel channel=null;
    
        public static Channel getChannel(){
            if(channel!=null) return channel;
            synchronized (RpcClientManager.class){
                if (channel!=null) return channel;
                initChannel();
                return channel;
            }
        }
    
        private static void initChannel() {
            NioEventLoopGroup group=new NioEventLoopGroup(2);
            LoggingHandler LOGGING_HANDLER=new LoggingHandler();
            MessageCodeSharable MESSAGE_CODEC=new MessageCodeSharable();
    
            //RPC消息响应处理器
            RpcResponseMessageHandler RPC_RESPONSE_HANDLER=new RpcResponseMessageHandler();
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder()); //处理粘包半包
                    ch.pipeline().addLast(LOGGING_HANDLER); //记录日志
                    ch.pipeline().addLast(MESSAGE_CODEC); //消息协议 编码、解码
    
                    //RPC消息响应处理器
                    ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
                }
            });
            try {
                channel = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync().channel();
                channel.closeFuture().addListener(future -> {
                    group.shutdownGracefully();
                });
            } catch (InterruptedException e) {
                log.debug("Client error,{}",e);
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    处理响应线程设置返回结果

    @Slf4j
    @ChannelHandler.Sharable
    public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
        //响应后将返回结果与对应的请求进行设置
        public static final Map<Integer, Promise<Object>> PROMISES=new ConcurrentHashMap<>();
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage message) throws Exception {
            //将响应结果返回给调用者
            Promise<Object> promise = PROMISES.remove(message.getSequenceId());
            Object returnValue = message.getReturnValue();
            Exception exceptionValue = message.getExceptionValue();
            if(exceptionValue!=null){
                promise.setFailure(exceptionValue);
            }else{
                promise.setSuccess(returnValue);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述

    7)几个问题

    • RpcResponseMessageHandler 中 PROMISES集合中的泛型问题:Promise,不能为 Promise

      • Promise 只能用来接收值,不能设置值。除非设置 null
    • RpcResponseMessageHandler 中将响应结果设置到 promise 的同时,需要从集合中移除

    • RpcResponseMessageHandler 是否可以共享?

      • 可以。因为共享变量为 ConcurrentHashMap,不存在线程安全问题。
    • 远程方法调用出错解决:比如 sayHello出现除0异常

      1. 客户断收到的响应数据太长:LengthFieldBasedFrameDecoder 抛出 TooLongFrameException

      2. 将响应信息中异常的信息缩短

        try {
            ......
            response.setReturnValue(result);
        } catch (Exception e) {
            log.debug("RpcRequestHandler erro,{}",e);
            //缩短异常的信息
            String msg = e.getCause().getMessage();
            response.setExceptionValue(new Exception("远程调用出错:"+msg));
        } finally {
            ctx.writeAndFlush(response);
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
    • 二、源码分析

      2.1 启动剖析

      Nio流程总览

      //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
      Selector selector = Selector.open(); 
      
      //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
      NioServerSocketChannel attachment = new NioServerSocketChannel();
      
      //3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
      serverSocketChannel.configureBlocking(false);
      
      //4 启动 nio boss 线程执行接下来的操作
      
      //5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
      SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
      
      //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
      
      //7 绑定端口
      serverSocketChannel.bind(new InetSocketAddress(8080));
      
      //8 触发 channel active 事件,在 head 中关注 op_accept 事件
      selectionKey.interestOps(SelectionKey.OP_ACCEPT);
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

      启动跟源码

      服务器入口 io.netty.bootstrap.ServerBootstrap#bind

      关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

      private ChannelFuture doBind(final SocketAddress localAddress) {
      	//1.执行 初始化
          //2.异步执行 注册
          final ChannelFuture regFuture = initAndRegister();
          final Channel channel = regFuture.channel();
          if (regFuture.cause() != null) {
              return regFuture;
          }
          
          //已经完成:
          if (regFuture.isDone()) {
              ChannelPromise promise = channel.newPromise();
              //3.立刻调用 doBind0
              doBind0(regFuture, channel, localAddress, promise);
              return promise;
          } 
          //没有完成:添加回调函数
          else {
              final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
              regFuture.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      Throwable cause = future.cause();
                      if (cause != null) {
                          promise.setFailure(cause);//处理异常...
                      } else {
                          promise.registered();
      					//3.由注册线程去执行 doBind0:绑定端口号、触发 active 事件、注册accept事件
                          doBind0(regFuture, channel, localAddress, promise);
                      }
                  }
              });
              return promise;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35

      关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

      //1.执行 初始化
      //2.异步执行 注册
      final ChannelFuture initAndRegister() {
          Channel channel = null;
          try {
              //1 初始化
              //1.1 创建 NioServerSocketChannel
              channel = channelFactory.newChannel();
              //1.2 给 NioServerSocketChannel 添加一个初始化器 ChannelInitializer
              init(channel);
          } catch (Throwable t) { // 处理异常...
              return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
          }
      
          //2 注册:将原生 channel 注册到 selector 上
          ChannelFuture regFuture = config().group().register(channel);
          if (regFuture.cause() != null) { // 处理异常...
          }
          return regFuture;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

      关键代码 io.netty.bootstrap.ServerBootstrap#init

      //1.2 给 NioServerSocketChannel 添加一个初始化器 ChannelInitializer
      void init(Channel channel) throws Exception {
          final Map<ChannelOption<?>, Object> options = options0();
          synchronized (options) {
              setChannelOptions(channel, options, logger);
          }
      
          final Map<AttributeKey<?>, Object> attrs = attrs0();
          synchronized (attrs) {
              for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                  @SuppressWarnings("unchecked")
                  AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                  channel.attr(key).set(e.getValue());
              }
          }
      
          ChannelPipeline p = channel.pipeline();
      
          final EventLoopGroup currentChildGroup = childGroup;
          final ChannelHandler currentChildHandler = childHandler;
          final Entry<ChannelOption<?>, Object>[] currentChildOptions;
          final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
          synchronized (childOptions) {
              currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
          }
          synchronized (childAttrs) {
              currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
          }
      	
          //1.2 为 NioServerSocketChannel 添加初始化器:初始化器什么时候执行?
          p.addLast(new ChannelInitializer<Channel>() {
              @Override
              public void initChannel(final Channel ch) throws Exception {
                  final ChannelPipeline pipeline = ch.pipeline();
                  ChannelHandler handler = config.handler();
                  if (handler != null) {
                      pipeline.addLast(handler);
                  }
      
                  //1.2.1 初始化器的职责:将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
                  ch.eventLoop().execute(new Runnable() {
                      @Override
                      public void run() {
                          pipeline.addLast(new ServerBootstrapAcceptor(
                                  ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                      }
                  });
              }
          });
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50

      关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

      • ServerBootStrap—>eventLoopGroup—>eventLoop—>Channel
      //2 注册:切换线程,并将原生 channel 注册到 selector 上
      public final void register(EventLoop eventLoop, final ChannelPromise promise) {
          // 一些检查...
          AbstractChannel.this.eventLoop = eventLoop;
      
          if (eventLoop.inEventLoop()) { //是否是EventLoop线程
              register0(promise);
          } else {
              try {
                  //2.1 切换线程
                  // 首次执行 execute 方法时:才启动 nio 线程,之后注册等操作在 nio 线程上执行
                  // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
                  eventLoop.execute(new Runnable() {
                      @Override
                      public void run() {
                          //2.2 将原生 channel 注册到 selector 上
                          register0(promise);
                      }
                  });
              } catch (Throwable t) {
                  // 日志记录...
                  closeForcibly();
                  closeFuture.setClosed();
                  safeSetFailure(promise, t);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

      io.netty.channel.AbstractChannel.AbstractUnsafe#register0

      //2.2 将原生 channel 注册到 selector 上
      private void register0(ChannelPromise promise) {
          try {
              if (!promise.setUncancellable() || !ensureOpen(promise)) {
                  return;
              }
              boolean firstRegistration = neverRegistered;
              // 2.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
              //this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
              doRegister();
              neverRegistered = false;
              registered = true;
      
              // 2.2.2 执行 NioServerSocketChannel 初始化器的 initChannel,回到 1.2
              pipeline.invokeHandlerAddedIfNeeded();
      
              // 2.2.3 设置initAndRegister()执行结果,回到 3.绑定端口号、注册accept事件
              safeSetSuccess(promise);
              pipeline.fireChannelRegistered();
              
              if (isActive()) {// 对应 server socket channel 还未绑定,isActive 为 false
                  if (firstRegistration) {
                      pipeline.fireChannelActive();
                  } else if (config().isAutoRead()) {
                      beginRead();
                  }
              }
          } catch (Throwable t) {
              closeForcibly();
              closeFuture.setClosed();
              safeSetFailure(promise, t);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33

      关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

      //3.绑定端口号、触发 active 事件、注册accept事件
      private static void doBind0(
              final ChannelFuture regFuture, final Channel channel,
              final SocketAddress localAddress, final ChannelPromise promise) {
      
          channel.eventLoop().execute(new Runnable() {
              @Override
              public void run() {
                  if (regFuture.isSuccess()) {
                      channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                  } else {
                      promise.setFailure(regFuture.cause());
                  }
              }
          });
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

      //3.绑定端口号、触发 active 事件、注册accept事件
      public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
          assertEventLoop();
      
          if (!promise.setUncancellable() || !ensureOpen(promise)) {
              return;
          }
      
          if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
              localAddress instanceof InetSocketAddress &&
              !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
              !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// 记录日志...
          }
      
          boolean wasActive = isActive();
          try {
              //3.1 ServerSocketChannel 绑定端口
              doBind(localAddress);
          } catch (Throwable t) {
              safeSetFailure(promise, t);
              closeIfClosed();
              return;
          }
      
          if (!wasActive && isActive()) {
              invokeLater(new Runnable() {
                  @Override
                  public void run() {
                      //3.2 触发 active 事件、注册accept事件
                      pipeline.fireChannelActive();
                  }
              });
          }
      
          safeSetSuccess(promise);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36

      关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

      //3.1 ServerSocketChannel 绑定端口
      protected void doBind(SocketAddress localAddress) throws Exception {
          if (PlatformDependent.javaVersion() >= 7) {
              javaChannel().bind(localAddress, config.getBacklog());
          } else {
              javaChannel().socket().bind(localAddress, config.getBacklog());
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8

      关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

      //3.2 触发 active 事件、注册accept事件
      public void channelActive(ChannelHandlerContext ctx) {
          //3.2.1 触发 active 事件
          ctx.fireChannelActive(); 
      	//3.2.2 注册accept事件
          readIfIsAutoRead();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

      //3.2.2 注册accept事件
      protected void doBeginRead() throws Exception {
          // Channel.read() or ChannelHandlerContext.read() was called
          final SelectionKey selectionKey = this.selectionKey;
          if (!selectionKey.isValid()) {
              return;
          }
      
          readPending = true;
      
          final int interestOps = selectionKey.interestOps();
          // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
          if ((interestOps & readInterestOp) == 0) {
              selectionKey.interestOps(interestOps | readInterestOp);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      2.2 NioEventLoop 剖析

      NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),

      • selector:unwrappedSelector(Selector)、selector(SelectedSelectionKeySetSelector)
      • 线程:thread(由executor中唯一的线程赋值)、executor(Executor)
      • 任务队列:taskQueue(Queue)、scheduledTaskQueue(PriorityQueue>)

      1)何时创建selector

      • 在构造方法调用时创建
      /**
       * io.netty.channel.nio.NioEventLoop#NioEventLoop
       */
      NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
              super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
              if (selectorProvider == null) {
                  throw new NullPointerException("selectorProvider");
              } else if (strategy == null) {
                  throw new NullPointerException("selectStrategy");
              } else {
                  this.provider = selectorProvider;
                  //在构造方法调用时创建:赋值给unwrappedSelector
                  NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
                  this.selector = selectorTuple.selector;
                  this.unwrappedSelector = selectorTuple.unwrappedSelector;
                  this.selectStrategy = strategy;
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

      2)为什么会有两个selector

      • unwrappedSelector:原始的Selector,将 selectedKeys属性 的Set实现改为了数组(SelectedSelectionKeySet )的实现
      • selector:SelectedSelectionKeySetSelector 实例,内部包装了 unwrappedSelector 、SelectedSelectionKeySet 数组
      /**
       * io.netty.channel.nio.NioEventLoop#NioEventLoop
       */
      NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
          //1.获取两个Selector
          NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
          //2.赋值
          this.selector = selectorTuple.selector;
          this.unwrappedSelector = selectorTuple.unwrappedSelector;
      }
      	//1.获取两个Selector
      	private NioEventLoop.SelectorTuple openSelector() {
              //引用 unwrappedSelector 中的 selectedKeySet
              this.selectedKeys = selectedKeySet;
      	    //...改为了数组实现:数组实现可以提高遍历性能(原本为 HashSet)
      	    return new NioEventLoop.SelectorTuple(
      	        //原始的Selector
      	        unwrappedSelector, 
      	        //包装后的Selector
      	        new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)
      	    );
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

      3)thread何时启动

      • 首次调用 execute 方法时,将 executor 中唯一的线程赋值给 thread

      • 执行该线程任务,任务为死循环,不断查看是否有任务(调用selector.select(timeoutMills))

        /**
         * io.netty.util.concurrent.SingleThreadEventExecutor#execute
         */
        public void execute(Runnable task) 
            // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
            this.addTask(task);
        	//1.首次调用,启动线程
            this.startThread();
        	//2.添加任务后执行wakeup
        	if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
        	    this.wakeup(inEventLoop);
        	}
        }
        	//1.首次调用,启动线程
            private void startThread() {
                this.doStartThread();
            }
            	private void doStartThread() {
               		this.executor.execute(new Runnable() {
                        //1.将 executor 中唯一的线程赋值给 thread
                        SingleThreadEventExecutor.this.thread = Thread.currentThread();
                        //2.执行该线程的 run 方法,进入死循环
                        SingleThreadEventExecutor.this.run();
                    }     
            	}
            				//2.执行thread任务:执行死循环,不断看有没有新任务、IO 事件 。循环+阻塞
            				protected void run() {
            				    while(true) {
            				        while(true) {
            				            while(true) {
            				                try {
            				                    try {
            				                        switch() {
            				                        case -3:
            				                        case -1:
            				                            //1.调用select
            				                            this.select(this.wakenUp.getAndSet(false));
            				                            if (this.wakenUp.get()) {
            				                                this.selector.wakeup();
            				                            }
            				                            break;
            				                        case -2:
            				                            continue;
            				                        }
            				                    }
                                                //...执行任务
            				                } 
            				            }
            				        }
            				    }
            				}
        					    //1.调用select
        					    private void select(boolean oldWakenUp) throws IOException {
        					        try {
        					            while(true) {
        					                //2.阻塞
        					                int selectedKeys = selector.select(timeoutMillis);
        					            }
        					        } catch (CancelledKeyException var13) {}
        					    }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53
        • 54
        • 55
        • 56
        • 57
        • 58
        • 59
        • 60

      4)普通任务会不会结束 select 阻塞

      • 会。非Nio线程每次调用 execute 方法后,会执行一次 wakeup

        /**
         * io.netty.util.concurrent.SingleThreadEventExecutor#execute
         */
        public void execute(Runnable task) 
            this.addTask(task);
            this.startThread();
        	//1.添加任务后执行wakeup
        	if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
        	    this.wakeup(inEventLoop);
        	}
        }
            protected void wakeup(boolean inEventLoop) {
                // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
                if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
                    this.selector.wakeup();
                }
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17

      5)普通任务 wakeup 理解

      /**
       * io.netty.channel.nio.NioEventLoop#wakeup
       */
      protected void wakeup(boolean inEventLoop) {
          //1.提交任务的线程不是Nio线程才会进入if块
          //2.保证多个非Nio线程同时提交任务后只唤醒一次
          if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
              this.selector.wakeup();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

      6)thread什么时候 select

      • 没有任务时:返回-1,进入阻塞逻辑
      • 有任务时:调用 selectNow(返回0-…) 顺便拿到io 事件,执行任务
      /**
       * io.netty.channel.nio.NioEventLoop#run
       */
      //执行死循环,不断看有没有新任务、IO 事件 。循环+阻塞
      protected void run() {
          while(true) {
              while(true) {
                  while(true) {
                      try {
                          try {
                              //1.当返回-1时进入阻塞逻辑
                              switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {
                              case -3:
                              case -1:
                                  //进入 select 逻辑
                                  this.select(this.wakenUp.getAndSet(false));
                                  if (this.wakenUp.get()) {
                                      this.selector.wakeup();
                                  }
                                  break;
                              case -2:
                                  continue;
                              }
                          }
                          //...执行任务
                      } 
                  }
              }
          }
      }
      	//1.没有任务时返回-1,进入阻塞逻辑
          public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
              return hasTasks ? selectSupplier.get() : -1;
          } 
      		//2.有任务时调用 selectNow(返回0-...) 顺便拿到io 事件
          	private final IntSupplier selectNowSupplier = new IntSupplier() {
          	    public int get() throws Exception {
          	        return NioEventLoop.this.selectNow();
          	    }
          	};
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40

      7)select 阻塞多久

      • 超时时间:(1s+0.5ms)/1ms = 1000ms
      • 退出阻塞:到达截至时间(1s)、存在普通任务、发生io事件、被唤醒、被打断
      /**
       *	io.netty.channel.nio.NioEventLoop#select
       */
      //进入 select 逻辑
      private void select(boolean oldWakenUp) throws IOException {
          //1.获取当前时间
          long currentTimeNanos = System.nanoTime();
          //2.没有定时任务,截至时间:当前时间 + 1s
          //2.存在定时任务,截至时间:下一个定时任务执行时间 - 当前时间
          long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);
          //截至时间不变,当前时间改变
          while(true) {
              //3.超时时间:(1s+0.0005s)/1ms = 1000ms
              long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
              //到达截至时间:退出阻塞
              if (timeoutMillis <= 0L) {
                  if (selectCnt == 0) {
                      selector.selectNow();
                      selectCnt = 1;
                  }
                  break;
              }
              //有普通任务:退出阻塞。如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
              if (this.hasTasks() && this.wakenUp.compareAndSet(false, true)) {
                  selector.selectNow();
                  selectCnt = 1;
                  break;
              }
              int selectedKeys = selector.select(timeoutMillis);
              //醒来后,有 IO 事件、非 EventLoop 线程唤醒、有任务:退出阻塞
              if (selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
                          break;
              }
              
              long time = System.nanoTime();
              currentTimeNanos = time;
          }
      }
      	//2.获取截至时间
          protected long delayNanos(long currentTimeNanos) {
              ScheduledFutureTask<?> scheduledTask = this.peekScheduledTask(); //不考虑
              return scheduledTask == null ? SCHEDULE_PURGE_INTERVAL : scheduledTask.delayNanos(currentTimeNanos);
          }
      		static {
              	SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1L);
          	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46

      8)BUG解决:select空轮询

      • BUG解释:即 select 不阻塞(jdk 在linux中才会出现)
      • BUG解决:空轮询超过阈值(默认512),重建、替换旧的 selector,并退出阻塞
      /**
       *	io.netty.channel.nio.NioEventLoop#select
       */
      private void select(boolean oldWakenUp) throws IOException {
          try {
              int selectCnt = 0;
              //1.循环+阻塞:如果出现bug没阻塞即空轮询,则 selectCnt++
              while(true) {
                  int selectedKeys = selector.select(timeoutMillis);
                  ++selectCnt;
                  //2.selectCnt超出阈值:重建并替换旧的 selector,退出阻塞
                  else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                      selector = this.selectRebuildSelector(selectCnt);
                      selectCnt = 1;
                      break;
                  }
              }
          } 
      }
      static {
          //3.阈值默认值:512
          int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
          if (selectorAutoRebuildThreshold < 3) {
              selectorAutoRebuildThreshold = 0;
          }
          SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

      9)thread 执行任务

      • 有多少任务执行多少任务
      • 按时间比例执行任务
        • 各占50%:io任务执行多久,普通任务就执行多久
      /**
       *	io.netty.channel.nio.NioEventLoop#run
       */
      //设置执行io任务的时间比例50%
      private volatile int ioRatio = 50;
      //执行死循环,不断看有没有新任务、IO 事件 。循环+阻塞
      protected void run() {
          while(true) {
              while(true) {
                  while(true) {
                      try {
                          int ioRatio = this.ioRatio;
                          //1.比例设置为 100:则时间分配无效,该次循环存在多少任务执行多少任务
                          if (ioRatio == 100) {
                              try {
                                  this.processSelectedKeys();
                              } finally {
                                  this.runAllTasks();
                              }
                          } 
                          //2.按时间比例执行任务
                          else {
                              long ioStartTime = System.nanoTime();
                              boolean var14 = false;
                              try {
                                  var14 = true;
                                  this.processSelectedKeys();
                                  var14 = false;
                              } finally {
                                  if (var14) {
                                      long ioTime = System.nanoTime() - ioStartTime;this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
                                  }
                              }
                              //2.1 获取 io任务 执行所用时间
                              //2.2 执行普通任务:所用时间与 io任务 相同
                              long ioTime = System.nanoTime() - ioStartTime;
                              this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
                          }
                      }
                  }
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44

      10)区分不同事件

      /**
       *	io.netty.channel.nio.NioEventLoop#run
       */
      //1.执行io任务
      protected void run() {
          while(true) {
              while(true) {
                  while(true) {
                      try {
                          else {
                              try {
                                  this.processSelectedKeys();
                              }
                          }
                      }
                  }
              }
          }
      }
          private void processSelectedKeys() {
              //2.如果selectedKeySet已替换为数组实现
              //数组实现可以提高遍历性能(原本为 HashSet)
              if (this.selectedKeys != null) {
                  this.processSelectedKeysOptimized();
              }
          }
          	private void processSelectedKeysOptimized() {
          	    for(int i = 0; i < this.selectedKeys.size; ++i) {
          	        SelectionKey k = this.selectedKeys.keys[i];
          	        this.selectedKeys.keys[i] = null;
          	        //3.获得事件相关的 Channel
          	        Object a = k.attachment();
          	        if (a instanceof AbstractNioChannel) {
          	            this.processSelectedKey(k, (AbstractNioChannel)a);
          	        } 
          	    }
          	}
      			//4.根据事件类型执行任务
          		private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
          		    NioUnsafe unsafe = ch.unsafe();
          		    if (!k.isValid()) {}
          		    //key有效
          		    else {
          		        try {
          		            int readyOps = k.readyOps();
                              //连接事件
          		            if ((readyOps & 8) != 0) {
          		                int ops = k.interestOps();
          		                ops &= -9;
          		                k.interestOps(ops);
          		                unsafe.finishConnect();
          		            }
                              //可写事件
          		            if ((readyOps & 4) != 0) {
          		                ch.unsafe().forceFlush();
          		            }
                              //可读、可接入事件
          		            if ((readyOps & 17) != 0 || readyOps == 0) {
                                  // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
                  				// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
          		                unsafe.read();
          		            }
          		        }
          		    }
          		}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      ⚠️ 注意

      这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:

      • 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
      • 由 EventLoop 自己调用,本次的 wakeup 会取消下一次的 select 操作

      在这里插入图片描述

      2.3 accept 剖析

      nio 流程总览

      //1 阻塞直到事件发生
      selector.select();
      
      Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
      while (iter.hasNext()) {    
          //2 拿到一个事件
          SelectionKey key = iter.next();
          
          //3 如果是 accept 事件
          if (key.isAcceptable()) {
              
              //4 执行 accept
              SocketChannel channel = serverSocketChannel.accept();
              channel.configureBlocking(false);
              
              //5 关注 read 事件
              channel.register(selector, SelectionKey.OP_READ);
          }
          // ...
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

      启动跟源码

      服务器入口io.netty.channel.nio.NioEventLoop#processSelectedKey

      /**
       * io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
       */
      public void read() {
          try {
              try {
                  do {
                      //1.ServerScoketChannel 执行 accept 创建 SocketChannel
      				//2.将 SocketChannel 包装为 NioSocketChannel、设置非阻塞,然后将 SocketChannel 作为消息放入 readBuf
                      int localRead = doReadMessages(readBuf);
                      if (localRead == 0) {
                          break;
                      }
                      if (localRead < 0) {
                          closed = true;
                          break;
                      }
      				// localRead 为 1,就一条消息,即接收一个客户端连接
                      allocHandle.incMessagesRead(localRead);
                  } while (allocHandle.continueReading());
              } catch (Throwable t) {
                  exception = t;
              }
      
              int size = readBuf.size();
              for (int i = 0; i < size; i ++) {
                  readPending = false;
                  //3.进入 NioServerSocketChannel 的流水线:
                  // 触发 read 事件,让 pipeline 上的 handler 处理
                  pipeline.fireChannelRead(readBuf.get(i));
              }
              readBuf.clear();
              allocHandle.readComplete();
              pipeline.fireChannelReadComplete();
      
              if (exception != null) {
                  closed = closeOnReadError(exception);
      
                  pipeline.fireExceptionCaught(exception);
              }
      
              if (closed) {
                  inputShutdown = true;
                  if (isOpen()) {
                      close(voidPromise());
                  }
              }
          } finally {
              if (!readPending && !config.isAutoRead()) {
                  removeReadOp();
              }
          }
      }
      	//1.
      	//2.
      	protected int doReadMessages(List<Object> buf) throws Exception {
              //1.ServerScoketChannel 执行 accept 创建 SocketChannel
              SocketChannel ch = SocketUtils.accept(this.javaChannel());
              try {
                  if (ch != null) {
                      //2.将 SocketChannel 包装为 NioSocketChannel、设置非阻塞,然后将 SocketChannel 作为消息放入 readBuf
                      buf.add(new NioSocketChannel(this, ch));
                      return 1;
                  }
              }
              return 0;
          }
      		//1.ServerScoketChannel 执行 accept 创建 SocketChannel
          	public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
          	    try {
          	        return (SocketChannel)AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
          	            public SocketChannel run() throws IOException {
          	                return serverSocketChannel.accept();
          	            }
          	        });
          	    }
          	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77

      关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

      //3.进入 NioServerSocketChannel 的流水线
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
          
          final Channel child = (Channel) msg; // 这时的 msg 是 NioSocketChannel
      
          child.pipeline().addLast(childHandler); // NioSocketChannel 添加  childHandler 即初始化器
      
          setChannelOptions(child, childOptions, logger); // 设置选项
      
          for (Entry<AttributeKey<?>, Object> e: childAttrs) {
              child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
          }
      
          try {
              //4.将 NioSocketChannel 注册到新的 NioEventLoop 线程中
              childGroup.register(child).addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                      if (!future.isSuccess()) {
                          forceClose(child, future.cause());
                      }
                  }
              });
          } catch (Throwable t) {
              forceClose(child, t);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

      又回到启动剖析中熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

      //4.切换线程:异步将 NioSocketChannel 注册到新的 NioEventLoop 线程中
      public final void register(EventLoop eventLoop, final ChannelPromise promise) {
          // 一些检查...
          AbstractChannel.this.eventLoop = eventLoop;
      
          if (eventLoop.inEventLoop()) { //是否是EventLoop线程
              register0(promise);
          } else {
              try {
                  //4.1 切换线程:这行代码完成的事实是 nio boss -> nio worker 线程的切换
                  // 首次执行 execute 方法时:才启动 nio 线程,之后注册等操作在 nio 线程上执行
                  eventLoop.execute(new Runnable() {
                      @Override
                      public void run() {
                          //4.2 将 NioSocketChannel 注册到新的 NioEventLoop 线程中
                          register0(promise);
                      }
                  });
              }
              //...
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

      io.netty.channel.AbstractChannel.AbstractUnsafe#register0

      //4.2 将 NioSocketChannel 注册到新的 NioEventLoop 线程中
      private void register0(ChannelPromise promise) {
          try {
              if (!promise.setUncancellable() || !ensureOpen(promise)) {
                  return;
              }
              boolean firstRegistration = neverRegistered;
              // 4.2.1 将 NioSocketChannel 注册到新的 selector 上
              // 注意此时没有注册 selector 关注事件,附件为当前的 NioSocketChannel
              // this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
              doRegister();
              neverRegistered = false;
              registered = true;
      		
              //4.2.2 执行初始化器:我们给NioSocketChannel写的 chileHandler—>initChannel
              //执行前 pipeline 中只有 head -> 初始化器 -> tail
              pipeline.invokeHandlerAddedIfNeeded();
              //执行后就是 head -> logging handler -> tail
      
              safeSetSuccess(promise);
              pipeline.fireChannelRegistered();
              
              if (isActive()) {
                  if (firstRegistration) {
                      //4.2.3 在新的 selector 上关注 read 事件
                      // 触发 pipeline 上 active 事件
                      pipeline.fireChannelActive();
                  } else if (config().isAutoRead()) {
                      beginRead();
                  }
              }
          } catch (Throwable t) {
              closeForcibly();
              closeFuture.setClosed();
              safeSetFailure(promise, t);
          }
      }
      	//4.2.2 执行初始化器:我们给NioSocketChannel写的 chileHandler—>initChannel
          public static void main(String[] args) {
              NioEventLoopGroup group = new NioEventLoopGroup(2);
              try {
                  Channel channel = new ServerBootstrap()
                          .childHandler(new ChannelInitializer<NioSocketChannel>() {
                              @Override
                              protected void initChannel(NioSocketChannel channel) throws Exception {
                                  //添加日志处理器
                                  channel.pipeline().addLast(new LoggingHandler());
                              }
                          })
                      //...
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52

      回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

      //4.2.3 在新的 selector 上关注 read 事件
      public void channelActive(ChannelHandlerContext ctx) {
          ctx.fireChannelActive();
          //关注 read 事件(NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
          this.readIfIsAutoRead();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

      io.netty.channel.nio.AbstractNioChannel#doBeginRead

      //关注 read 事件
      protected void doBeginRead() throws Exception {
          SelectionKey selectionKey = this.selectionKey;
          if (selectionKey.isValid()) {
              this.readPending = true;
              int interestOps = selectionKey.interestOps();//这时候 interestOps 是 0
              if ((interestOps & this.readInterestOp) == 0) {
                  //关注 read 事件
                  selectionKey.interestOps(interestOps | this.readInterestOp);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

      2.4 read 剖析

      再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

      public final void read() {
          final ChannelConfig config = config();
          if (shouldBreakReadReady(config)) {
              clearReadPending();
              return;
          }
          final ChannelPipeline pipeline = pipeline();
          //1.获取 byteBuf 分配器:决定是池化还是非池化的
          // io.netty.allocator.type 决定 allocator 的实现
          final ByteBufAllocator allocator = config.getAllocator();
          //2.动态调整 byteBuf 的分配大小,并且强制使用直接内存
          final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
          a llocHandle.reset(config);
      
          ByteBuf byteBuf = null;
          boolean close = false;
          try {
              do {
                  byteBuf = allocHandle.allocate(allocator);
                  //3.读取到 byteBuf
                  allocHandle.lastBytesRead(doReadBytes(byteBuf));
                  if (allocHandle.lastBytesRead() <= 0) {
                      byteBuf.release();
                      byteBuf = null;
                      close = allocHandle.lastBytesRead() < 0;
                      if (close) {
                          readPending = false;
                      }
                      break;
                  }
      
                  allocHandle.incMessagesRead(1);
                  readPending = false;
                  //4.触发 read 事件,把 ByteBuf 依次传给流水线中的handler 处理,这时是处理 NioSocketChannel 上的 handler
                  pipeline.fireChannelRead(byteBuf);
                  byteBuf = null;
              } 
              //5.是否要继续循环
              while (allocHandle.continueReading());
      
              allocHandle.readComplete();
              // 触发 read complete 事件
              pipeline.fireChannelReadComplete();
      
              if (close) {
                  closeOnRead(pipeline);
              }
          } catch (Throwable t) {
              handleReadException(pipeline, byteBuf, t, close, allocHandle);
          } finally {
              if (!readPending && !config.isAutoRead()) {
                  removeReadOp();
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55

      io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

      //5.是否要继续循环
      public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
          return 
                 // 一般为 true
                 config.isAutoRead() &&
                 // respectMaybeMoreData 默认为 true
                 // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
                 (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                 // 小于最大次数,maxMessagePerRead 默认 16
                 totalMessages < maxMessagePerRead &&
                 // 实际读到了数据
                 totalBytesRead > 0;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • 相关阅读:
      SpringCloudConfig分布式配置中心
      Object转List<>,转List<Map<>>
      二分查找实例1(在排序数组中查找元素的第一个和最后一个位置)
      影刀掌握手头,仿佛自由人--更符合中国宝宝体质的自动化工具
      Zabbix最新6.2安装及使用!
      复习十:栈与递归的实现
      基于微信小程序云开(统计学生信息并导出excel)2.0版
      混淆加密JS,可以压缩代码体积吗?
      Java关于MongoTemplate的增删改查实战代码解析(全)
      webpack5 PWA解决Web App 项目网络离线情况没法访问情况
    • 原文地址:https://blog.csdn.net/weixin_43401592/article/details/127721041