• 全流程分析Netty设计思路与实践


    1. Netty基本概念

    Netty通用代码:

    创建两个线程池,分别用于处理ServerSocket事件和Socket事件;并指定ServerSocket和Socket发生事件时执行自定义类ServerHandler中的方法:

    Netty业务代码:

    ServerHander定义了方法,当服务端接受到了客户端发送的数据时,调用channelRead方法处理数据;当socket/serverSocket注册到selector中时,调用channelRegistered:

    上述代码中,netty架构图如下所示:

    从Netty架构图中可以看到NioEventLoopGroup和pipeline是最重要的概念,后面将会从Netty工作流程详细分析这两个概念的实现思想。

    2. Netty工作流程

    2.1 创建bossGroup和workerGroup对象

    如下,bossGroup对应NioEventLoopGroup创建1个NioEventLoop,workerGroup创建10个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。在线程中则是处理selector注册的socket上发生的事件。

    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

    NioEventLoopGroup从子类到父类的初始化顺序为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。

    注意:bossGroup只需要指定创建1个NioEventLoop,因为服务端只有一个ServerSocketChannel对象,根本没办法注册到多个selector中。没有任何资料能够实际展示bossGroup中的多个NioEventLoop。

    2.1.1 创建SelectorProvider

    SelectorProvider是Selector多路复用的工厂类,用于创建Selector的实现类。NioEventLoopGroup初始化时,创建了SelectorProvider对象:

    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }

    SelectorProvider类通过rt.jar包中的sun.nio.ch.DefaultSelectorProvider类调用create方法,创建SelectorProvider实现:

    public abstract class SelectorProvider {
        public static SelectorProvider provider() {
            synchronized (lock) {
                if (provider != null)
                    return provider;
                return AccessController.doPrivileged(
                    new PrivilegedAction() {
                        public SelectorProvider run() {
                                if (loadProviderFromProperty())
                                    return provider;
                                if (loadProviderAsService())
                                    return provider;
                                provider = sun.nio.ch.DefaultSelectorProvider.create();
                                return provider;
                            }
                        });
            }
        }
    }

    不同操作系统的jdk包中rt.jar包中DefaultSelectorProvider实现不同,例如mac os的create方法返回KQueueSelectorProvider对象:

    public class DefaultSelectorProvider {
        private DefaultSelectorProvider() {
        }
    
        public static SelectorProvider create() {
            return new KQueueSelectorProvider();
        }
    }

    linux操作系统rt.jar包的create方法返回EPollSelectorProvider对象:

    public class DefaultSelectorProvider {
        private DefaultSelectorProvider() {
        }
    
        public static SelectorProvider create() {
            String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
            if (var0.equals("SunOS")) {
                return createProvider("sun.nio.ch.DevPollSelectorProvider");
            } else {
                return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
            }
        }
    }

    EPollSelectorProvider可以通过openSelector方法创建EPollSelectorImpl对象:

    public class EPollSelectorProvider extends SelectorProviderImpl {
        public EPollSelectorProvider() {
        }
    
        public AbstractSelector openSelector() throws IOException {
            return new EPollSelectorImpl(this);
        }
    }

    EPollSelectorImpl最底层封装了socket系统调用epoll_create、epoll_ctl,完成多路复用功能。

    2.1.2 创建线程池

    有了SelectorProvider,就可以创建线程执行器Executor了。线程池中每一个线程的创建动作由DefaultThreadFactory定义。Executor直接从线程池中使用一个线程:

    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
            //创建线程执行器,
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
            //省略
        }
    
        //创建线程池
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass());
        }
    }

    线程池的初始化操作如下:

    public class DefaultThreadFactory implements ThreadFactory {
        public DefaultThreadFactory(Class poolType) {
            this(poolType, false, Thread.NORM_PRIORITY);
        }
    
        public DefaultThreadFactory(Class poolType, boolean daemon, int priority) {
            this(toPoolName(poolType), daemon, priority);
        }
    
        public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
            ObjectUtil.checkNotNull(poolName, "poolName");
    
            if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
                throw new IllegalArgumentException(
                        "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
            }
            //使用统一的前缀作为线程名
            prefix = poolName + '-' + poolId.incrementAndGet() + '-';
            this.daemon = daemon;
            this.priority = priority;
            this.threadGroup = threadGroup;
        }
    
        //可以调用newThread直接创建一个线程
        public Thread newThread(Runnable r) {
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            try {
                if (t.isDaemon() != daemon) {
                    t.setDaemon(daemon);
                }
    
                if (t.getPriority() != priority) {
                    t.setPriority(priority);
                }
            } catch (Exception ignored) {
                // Doesn't matter even if failed to set.
            }
            return t;
        }
    }

    定义了线程名前缀:

    后续创建线程时,使用线程名做前缀:

    ThreadPerTaskExecutor调用execute时,直接从线程池中创建一个新线程:

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }

    2.1.3 封装线程池和Selector

    通过创建SelectorProvider和Executor两个重要依赖后,就可以构造NioEventLoop了:

    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
            //创建线程池
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
            //创建NioEventLoop,bossGroup指定1个NioEventLoop,workerGroup指定10个NioEventLoop
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    //创建NioEventLoop
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    //省略
                }
            }
            chooser = chooserFactory.newChooser(children);
    
            //省略
        }
    
        //创建NioEventLoop的方法由NioEventLoopGroup类实现
        protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
    }

    NioEventLoopGroup实现了newChild方法,创建NioEventLoop对象:

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
        }
    }

    NioEventLoop中,通过openSelector()方法创建selector,也就是EPollSelectorImpl对象。

    public final class NioEventLoop extends SingleThreadEventLoop {
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) {
            super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                    rejectedExecutionHandler);
            this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
            this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
            final SelectorTuple selectorTuple = openSelector();
            this.selector = selectorTuple.selector;
            this.unwrappedSelector = selectorTuple.unwrappedSelector;
        }
    
        private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                //创建EPollSelectorImpl对象
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
            //省略
            return new SelectorTuple(unwrappedSelector);
        }
    }

    1.2 NioEventLoopGroup总结

    NioEventLoopGroup包含多个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。每个NioEventLoop中,都包含一个Selector以及一个线程,线程暂时用ThreadPerTaskExecutor表示,执行ThreadPerTaskExecutor#executor就会创建NioEventLoop专属的线程。

    1.3 创建启动类ServerBootstrap对象

    ServerBootstrap是启动类,将NioEventLoopGroup等参数传递到ServerBootstrap中,ServerBootstrap负责启动netty服务端。

    1.3.1 指定SeverSocketChannel的实现类

    指定NioServerSocketChannel作为netty的SeverSocketChannel实现类:

    serverBootstrap.channel(NioServerSocketChannel.class);

    NioServerSocketChannel的构造函数通过EPollSelectorProvider创建ServerSocketChannel对象

    public class NioServerSocketChannel extends AbstractNioMessageChannel
                                 implements io.netty.channel.socket.ServerSocketChannel {
        //DEFAULT_SELECTOR_PROVIDER就是EPollSelectorProvider对象
        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
    
        private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                //通过EPollSelectorProvider的父类SelectorProviderImpl的openServerSocketChannel()方法创建ServerSocketChannel对象。
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    }

    NioServerSocketChannel通过父类的AbstractNioChannel构造方法设置ServerSocketChannel为非阻塞:

    public abstract class AbstractNioChannel extends AbstractChannel {
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    }

    NioServerSocketChannel的父类AbstractChannel会为ServerSocketChannel创建对应的Unsafe和Pipeline,这个后面再展开:

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    
        protected abstract AbstractUnsafe newUnsafe();
    
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }
    }

    1.3.2 配置handler

    handler表示socket发生事件时,应该执行的操作。

    serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
                        .handler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(ServerSocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ServerHandler());
                            }
                        })
                        .childHandler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ServerHandler());
                            }
                        });

    ServerBootstrap的父类AbstractBootstrap保存ServerSocketChannel对应的handler:

    public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
        public B handler(ChannelHandler handler) {
            this.handler = ObjectUtil.checkNotNull(handler, "handler");
            return self();
        }
    }

    ServerBootstrap保存SocketChannel对应的childHander:

    public class ServerBootstrap extends AbstractBootstrap {
        public ServerBootstrap childHandler(ChannelHandler childHandler) {
            this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
            return this;
        }
    }

    1.4 netty服务端启动

    通过ServerBootstrap#bind方法启动netty服务端:

    ChannelFuture future = serverBootstrap.bind(8080).sync();

    1.4.1 创建ServerSocketChannel

    调用ServerBootstrap的父类AbstractBootstrap的doBind方法,通过AbstractBootstrap#initAndRegister开始创建ServerSocketChannel:

    public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
            //省略
       }
    
       //创建ServerSocketChannel
       final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                //省略
            }
            //省略
    }

    从上面的AbstractBootstrap#initAndRegister可以看到channelFactory#newChannel方法,它就调用了NioServerSocketChannel的构造函数,而NioServerSocketChannel构造函数里面就创建了ServerSocketChannel,并设置了非阻塞。

    1.4.2 初始化pipeline

    在创建完NioServerSocketChannel后,通过init方法,将主程序中定义的的Handler放到NioServerSocketChannel的pipeline中:

    public class ServerBootstrap extends AbstractBootstrap {
        void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry, Object>[] currentChildOptions;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
            }
            final Entry, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
    
            p.addLast(new ChannelInitializer() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    }

    1.4.3 DefaultChannelPipeline插入元素

    pipeline基于设计模式中的责任链模式。责任链模式为请求创建了一个处理对象的链。发起请求和具体处理请求的过程进行解耦:职责链上的处理者(Handler)负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。

    当用户发起请求时,服务端逐步调用Inbound Handler,响应用户请求时,服务端逐步调用Outbound Handler。如下所示:

    在创建ServerSocketChannel时,创建了NioEventLoop对应的DefaultChannelPipeline对象,该pipeline专属于ServerSocketChannel。

    如下可以看到,DefaultChannelPipeline就是一个链表结构,每次addLast方法插入一个handler,就将handler封装成DefaultChannelHandlerContext,加入到链表结尾:

    public class DefaultChannelPipeline implements ChannelPipeline {
        final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
    
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventLoop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallb
  • 相关阅读:
    多极神经元手绘图作业,多极运动神经元手绘图
    Nacos注册中心
    我的Windows10下的WSL的使用经历
    备份和恢复Gitlab数据
    第三十七章 在 UNIX®、Linux 和 macOS 上使用 IRIS(二)
    第4章 文件IO
    缓存篇—缓存击穿
    台湾SSS鑫创SSS1700替代Cmedia CM6533 24bit 96KHZ USB音频编解码芯片
    ardupilot开发 --- 外设适配器、外设拓展、AP_Periph 篇
    硬盘驱动器基础知识
  • 原文地址:https://blog.csdn.net/lt_xiaodou/article/details/126689248