最近看了一下netty,还是把学习的总结一下,不然过几天全忘记了。
一个提供异步事件驱动的网络应用框架,在java io的基础上进行了包装开发,特性:高性能、可维护、快速开发。
使用例子见官网:Netty.docs: User guide for 5.x
主线程
2.2.1创建Selector
2.2.2创建ServerSocketChannel 与初始化ServerSocketChannel
- io.netty.bootstrap.AbstractBootstrap#doBind:
- final ChannelFuture regFuture = this.initAndRegister();
2.2.4给ServerSocketChannel选择运行NioEventLoop
- io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel):
-
- //选择一个处理注册事件
- public EventExecutor next() {
- return this.chooser.next();
- }
NioEventLoop 线程:
2.2.5 ServerSocketChannel 注册到Selector
- io.netty.channel.AbstractChannel.AbstractUnsafe#register:
-
- //eventLoop 处理
-
- if (eventLoop.inEventLoop()) {
- this.register0(promise);
- } else {
- try {
- eventLoop.execute(new OneTimeTask() {
- public void run() {
- AbstractUnsafe.this.register0(promise);
- }
- });
- }
2.2.6 绑定地址启动
- io.netty.channel.socket.nio.NioServerSocketChannel#doBind
-
- this.javaChannel().bind(localAddress)
2.2.7 注册接受连接事件(OP_ACCEPT)到Selector 上
下图主要展示的是服务端流程,客户端与服务端有一些不同:启动类是Bootstrap,直接在第2步时就创建NioSocketChannel,并将NioSocketChannel注册给Selector,注册完成后initChannel,之后解析连接,并向服务端发起连接事件。待连接完成后激活,注册读事件到Selector,调用ChannelActive-》beginRead-》selectionKey.interestOps(interestOps | this.readInterestOp)。
在netty中核心事件的处理都是用的责任链模式,核心方法定义在接口ChannelHandlerInvoker中,并在相对应的方法还在接口ChannelHandlerContext、ChannelPipeline 有定义。调用过程为:ChannelPipeline-》ChannelHandlerContext--》ChannelHandlerInvoker--》具体实现Handler。
责任链上下文AbstractChannelHandlerContext ,责任处理器的接口在ChannelPipeline中,Pipeline会有两指针指向头尾责任链头尾节点,Inbound事件从头结点开始向尾遍历,Outbound事件从尾节点开始向头遍历。
在调用时,首先获取到pipeline,再通过pipeline调用方法,例如注册方法
AbstractChannel.this.pipeline.fireChannelRegistered();
DefaultChannelPipeline:
- public ChannelPipeline fireChannelRegistered() {
- this.head.fireChannelRegistered();
- return this;
- }
AbstractChannelHandlerContext 中包含的重要成员变量:
- volatile AbstractChannelHandlerContext next;
- volatile AbstractChannelHandlerContext prev;
- private final AbstractChannel channel;
- private final DefaultChannelPipeline pipeline;
- final int skipFlags;
- private final ChannelHandler handler;
- public ChannelHandlerContext fireChannelRegistered() {
- AbstractChannelHandlerContext next = this.findContextInbound();
- next.invoker().invokeChannelRegistered(next);
- return this;
- }
-
-
- private AbstractChannelHandlerContext findContextInbound() {
- AbstractChannelHandlerContext ctx = this;
-
- do {
- ctx = ctx.next;
- } while((ctx.skipFlags & 2044) == 2044);
-
- return ctx;
- }
-
- public final ChannelHandlerInvoker invoker() {
- return (ChannelHandlerInvoker)(this.invoker == null ? this.channel().eventLoop().asInvoker() : this.wrappedEventLoop());
- }
其中AbstractChannel 包含PausableChannelEventLoop,PausableChannelEventLoop implements ChannelHandlerInvoker 接口,因此next.invoker().invokeChannelRegistered(next);可以调用ChannelHandlerInvoker的方法
AbstractChannel.this.eventLoop = AbstractChannel.this.new PausableChannelEventLoop(eventLoop);
DefaultChannelHandlerInvoker:具体任务执行交给NioEventLoop,如果当前运行的线程时NioEventLoop,直接调用,否则提交给executor(例如单线运行的线程是main)
- public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
- if (this.executor.inEventLoop()) {
- ChannelHandlerInvokerUtil.invokeChannelRegisteredNow(ctx);
- } else {
- this.executor.execute(new OneTimeTask() {
- public void run() {
- ChannelHandlerInvokerUtil.invokeChannelRegisteredNow(ctx);
- }
- });
- }
-
- }
具体调用在ChannelHandlerInvokerUtil中,调用通过findContextInbound找到的上下文中的hander了
- public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
- try {
- ctx.handler().channelRegistered(ctx);
- } catch (Throwable var2) {
- notifyHandlerException(ctx, var2);
- }
-
- }
class ChannelInitializerextends ChannelHandlerAdapter:
方法执行后想责任链继续往下走,会继续调用 ctx.fireChannelRegistered();
- public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- ChannelPipeline pipeline = ctx.pipeline();
- boolean success = false;
-
- try {
- this.initChannel(ctx.channel());
- pipeline.remove(this);
- ctx.fireChannelRegistered();
- success = true;
- } catch (Throwable var8) {
- logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), var8);
- } finally {
- if (pipeline.context(this) != null) {
- pipeline.remove(this);
- }
-
- if (!success) {
- ctx.close();
- }
-
- }
-
- }
读数据技巧:自适应分配器
1、Netty 待写数据太多,超过一定的水位线( writeBufferWaterMark .high) () ),会将可写的标志位改成false ,让应用端自己做决定要不要发送数据了
2、只要有数据要写,且能写,则一直尝试,直到 16 次( writeSpinCount ),写 16 次还没有写完,就直接 schedule 一个 task 来继续写,而不是用注册写事件来触发,更简洁有力。
3、批量写数据时,如果尝试写的都写不进去了,接下来会尝试写更多(maxBytesPerGatheringWrite )
在netty中,NioEventLoop会执行任务,其中线程池才用的是ForkJoinPool,
在netty中的ForkJoinPo是netty自己定义的。nEventExecutors默认16。
- eventLoop.execute(new OneTimeTask() {
- public void run() {
- AbstractUnsafe.this.register0(promise);
- }
- });
任务添加过程下图所示:
run:
唤醒selector,如果selector阻塞等待有事件注册,执行后selector直接返回
- protected void wakeup(boolean inEventLoop) {
- if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
- this.selector.wakeup();
- }
-
- }
在里面设置任务队列数,与java11中设置任务队列数一样。
- WorkQueue[] nws;
- label214: {
- nps = this.parallelism;
- s = nps > 1 ? nps - 1 : 1;
- s |= s >>> 1;
- s |= s >>> 2;
- s |= s >>> 4;
- s |= s >>> 8;
- s |= s >>> 16;
- s = s + 1 << 1;
- nws = (ws = this.workQueues) != null && ws.length != 0 ? null : new WorkQueue[s];
- protected ForkJoinWorkerThread(ForkJoinPool pool) {
- super("aForkJoinWorkerThread");
- this.pool = pool;
- this.workQueue = pool.registerWorker(this);
- }
WorkQueue
- WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, int seed) {
- this.pool = pool;
- this.owner = owner;
- this.mode = (short)mode;
- this.hint = seed;
- this.base = this.top = 4096;
- }
这很像WorkStealingPool,Executors.newWorkStealingPool();其底层也是ForkJoinPool。
jdk11中的ForkJoinPool:与ThreadPoolExecutor创建的线程池不一样,ThreadPoolExecutor是所有线程共用一个任务队列,ForkJoinPool有多个任务队列。
参数:
int parallelism,并行度,
ForkJoinWorkerThreadFactory factory,创建线程的工程
UncaughtExceptionHandler handler:在内部线程执行时发生不可恢复的错误终止时的处理程序。
asyncMode:如果为ture,则为从未加入的 Fork任务建立本地先进先出调度模式。
corePoolSize:线程池中保留存活的线程数。
maximumPoolSize:线程池允许的最大线程数。
minimumRunnable:最小没有阻塞的线程数,如果小于,则新创建线程,直到达到最大线程数。
Predicate super ForkJoinPool> saturate:如果该参数不为空,当尝试创建超过最大线程数的线程时,则会调用Predicate。Predicate返回ture,不会抛出异常,线程池会继续以小于目标可运行的线程数继续运行,但不能保证进度。否则,当一个线程被阻塞,由于达到最大线程数,无法替换时,抛出异常RejectedExecutionException。
keepAliveTime:自从最后一次被使用后的存活时间。
unit:存活时间单位。
ForkJoinPool的任务队列数:parallelism<1,n=4,否则parallelism-1后的二进制最高位1(其余位为0)往左移2位,这就是任务队列数。
- public ForkJoinPool(int parallelism,
- ForkJoinWorkerThreadFactory factory,
- UncaughtExceptionHandler handler,
- boolean asyncMode,
- int corePoolSize,
- int maximumPoolSize,
- int minimumRunnable,
- Predicate<? super ForkJoinPool> saturate,
- long keepAliveTime,
- TimeUnit unit) {
- // check, encode, pack parameters
- if (parallelism <= 0 || parallelism > MAX_CAP ||
- maximumPoolSize < parallelism || keepAliveTime <= 0L)
- throw new IllegalArgumentException();
- if (factory == null)
- throw new NullPointerException();
- long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
-
- int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
- long c = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
- (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
- int m = parallelism | (asyncMode ? FIFO : 0);
- int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
- int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
- int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
- int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
- n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
- n = (n + 1) << 1; // power of two, including space for submission queues
-
- this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
- this.workQueues = new WorkQueue[n];
- this.factory = factory;
- this.ueh = handler;
- this.saturate = saturate;
- this.keepAlive = ms;
- this.bounds = b;
- this.mode = m;
- this.ctl = c;
- checkPermission();
- }
使用例子:
一般我们定义任务的时候是从Runnable来继承,但ForkJionPoll需要定义成ForkJoinTask,在下面的例子中将使用RecursiveTask(继承了ForkJoinTask),会递归的把大任务切分成小任务,直到满足条件为止。例外还有一个特别相似的RecursiveAction,这个没有返回值,前面的RecursiveTask有返回值。
计算一个 给定数组的累加和。
- public class T1_ForkJoinPool {
- static int[] nums =new int[1000000];
- static final int MAX_NUM=50000;
- static Random r =new Random();
- static {
- for(int i=0;i<nums.length;i++){
- nums[i]=r.nextInt(100);
- }
- int sum = 0;
- for (int num : nums) {
- sum += num;
- }
- System.out.println("----"+ sum);
- }
- static class AddTaskRet extends RecursiveTask<Long> {
- private static final long serialVersionUID=1L;
- int start,end;
-
- public AddTaskRet(int start, int end) {
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected Long compute() {
- if(end-start<=MAX_NUM){
- long sum =0L;
- for(int i=start;i<end;i++){
- sum+=nums[i];
- }
- return sum;
- }
- int middle =start+(end-start)/2;
- AddTaskRet sub1=new AddTaskRet(start,middle);
- AddTaskRet sub2= new AddTaskRet(middle,end);
- sub1.fork();
- sub2.fork();
- return sub1.join()+sub2.join();
- }
- }
-
- public static void main(String[] args) {
- T1_ForkJoinPool t1_forkJoinPool=new T1_ForkJoinPool();
- ForkJoinPool joinPool =new ForkJoinPool();
- AddTaskRet taskRet =new AddTaskRet(0,nums.length);
- joinPool.execute(taskRet);
- long result=taskRet.join();
- System.out.println(result);
- }
- }
- //FileChannel 文件读写、映射和操作的通道
- FileChannel fileChannel = new FileInputStream(fileName).getChannel();
- long startTime = System.currentTimeMillis();
- //transferTo⽅法⽤到了零拷⻉,底层是sendfile,这里只需要发生2次copy和2次上下文切换
- long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);