• Netty - Reactor线程模型解析


    • 阻塞:读取某个资源,等待资源读取完成,再执行后续操作
    • 非阻塞:读取某个资源,立即返回,若无法给出结果,返回一个预定义好的值

    一个非阻塞的例子:列表中有三份资源,在 while 循环中不断遍历,如果资源读取成功,从列表移除

    list = [resource1, resource2, resource3];
    while (!list.Empty()){	// 列表不为空
    	for (resource of list) {	// 遍历列表
    		data = read(resource);	// 读取资源
    		if (data !== null) { 
    			list.remove(resource) ;
    		} else {
    			continue;
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    缺点:在绝大多数情况下,迭代的资源,都还未将数据准备好,因此,白白浪费了 CPU

    如何优化?
    定义一个监听器,监听列表,当列表中有资源读取成功,发送消息给监听器,监听器执行之后相应操作

    while (events = watch(list)) {	// 监听 list,如果有资源读取完成,放入 events 中
    	for (event of events) {		// 遍历 events,读取(读取完成的资源)
    		events.remove(event);
    		list.remove(event);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    reactor 模式:是一种处理 I/O 操作的模式,如果受监控的某资源产生新事件,那就接触阻塞,对事件做出反应

    极简 netty 服务器

    NioEventLoopGroup boosGroup = new NioEventLoopGroup();
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    new ServerBootstrap()
        .group(boosGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) { }
        }).bind(8080);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    创建 NioEventLoopGroup 流程

    在 MultithreadEventExecutorGroup 中,主要做了 3 件事

    1. 创建 ThreadPerTaskExecutor
    2. 创建 NioEventLoop
    3. 创建线程选择器
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        // ......
        
        if (executor == null) {
        	// 1 创建 ThreadPerTaskExecutor
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
    	// 2 创建 NioEventLoop
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            try {
                children[i] = newChild(executor, args);
            }
        }
        
    	// 3 创建线程选择器
        chooser = chooserFactory.newChooser(children);
    
        // ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    对应流程图如下

    看源码时,记得留意类的英文名,对应的中文意思,见名思意

    在这里插入图片描述

    bind(port) - register

    大致如下
    bind -> doBind
    doBind() 中 有 initAndRegister() 方法,大致行为如下:

    1. 创建服务器 channel:channel = channelFactory.newChannel()
    2. 初始化服务器 channel:init(channel)
    3. 注册服务器 channel:config().group().register(channel)

    SingleThreadEventLoop - 单个线程事件循环

    在 3 register 中,有如下语句

    public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
        public ChannelFuture register(final ChannelPromise promise) {
            // ......
            promise.channel().unsafe().register(this, promise);		// <--- 传入 this,之后通过 this 调用该抽象类的 execute 方法
            return promise;
        }
    }
    
    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    	// 内部类
    	protected abstract class AbstractUnsafe implements Unsafe {
    		public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    			// inEventLoop:当前线程是否是 Netty 的 Reactor 线程,这里返回 false
    			if (eventLoop.inEventLoop()) {
    			   register0(promise);
    			} else {
    				try {
    				    eventLoop.execute(new Runnable() {		// <--- 调用 SingleThreadEventExecutor # execute()
    				        @Override
    				        public void run() {
    				            register0(promise);
    				        }
    				    });
    			    }
    			} 
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    SingleThreadEventExecutor # execute() - 单个线程事件执行器

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        public void execute(Runnable task) {
            // ......
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                addTask(task);
            } else {
                startThread();		// <--- 调用
                addTask(task);
                // ......
            }
    		// ......
        }
        
        private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();		// <--- 调用
                }
            }
        }
    
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {		// <--- 调用 ThreadPerTaskExecutor 的 execute 方法
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    // ......
                    try {
                        SingleThreadEventExecutor.this.run();
                    } // ......
                }
            });
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    executor 为 NioEventLoop

    这里的 executor 就是:创建 NioEventLoopGroup 流程中,第一步,创建的 ThreadPerTaskExecutor,之后在创建 NioEventLoop 时,传入 executor,之后, newChild 的时候,将其存入 SingleThreadEventExecutor 类的 executor

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        // ......
        if (executor == null) {
        	// 1 创建 ThreadPerTaskExecutor
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
    	// 2 创建 NioEventLoop
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            try {
                children[i] = newChild(executor, args);		// <----- 传入 ThreadPerTaskExecutor
            }
        }
        // ......
    }
    
    // newChild 最后会执行到这里
    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler) {
            // .......
            this.executor = ObjectUtil.checkNotNull(executor, "executor");
            // .......
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    ThreadPerTaskExecutor # execute - 线程的每个任务执行器

    于是乎,调用 ThreadPerTaskExecutor 的 execute 方法,如下

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();		// <----- 执行
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    public class DefaultThreadFactory implements ThreadFactory {
        public Thread newThread(Runnable r) {
            Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());		// <----- 执行
            // ......
            return t;
        }
    
        protected Thread newThread(Runnable r, String name) {
            return new FastThreadLocalThread(threadGroup, r, name);		// <----- 执行 FastThreadLocalThread.start()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    执行 FastThreadLocalThread.start(),会执行对应的 Runnable

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        private void doStartThread() {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    // ......
                    try {
                        SingleThreadEventExecutor.this.run();		// <--- 调用 SingleThreadEventExecutor的 run 方法
                    } // ......
                }
            });
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    // TODO 待完成

  • 相关阅读:
    java对接支付宝支付
    Bash: export:”=”不是有效的标识符;Bash:cyber_launch command not found
    stm32、IO口、中断、串口、定时器讲解
    第四章:存储子系统 [计算机组成原理笔记](自用)
    肠道核心菌属——经黏液真杆菌属(Blautia),炎症肥胖相关的潜力菌
    IDEA中查看整个项目代码行数
    java毕业生设计宠物管理系统计算机源码+系统+mysql+调试部署+lw
    图论学习 c++Ford-Fulkerson 方法
    JPA 查询的类型
    超简单的抖音去水印
  • 原文地址:https://blog.csdn.net/qq_30763385/article/details/127585920