• java基于quasar实现协程池【后篇】


    java基于quasar实现协程池【前篇】java基于quasar实现协程池_爪哇盘古的博客-CSDN博客

    在上一个文章中讲述了通过仿照java自写线程池的方式改写成quasar协程池,功能可以说实现了效果,但是遇到了一个烦恼就是在协程阻塞过程中会疯狂报警告,如果您的项目有日志文件产生当遇到一个非常耗时的任务时后面的任务阻塞产生警告,那么该日志文件的体量是致命的!所以为了摆脱这个问题,不要尝试、不要猜。要看文档!【我对英文很不友好的】

    quasar纤程文档FiberExecutorScheduler (Quasar 0.8.0)

    在该文档中我发现了FiberExecutorScheduler类,这个类将是本文阐述quasar协程池的正确打开方式!【全是泪】

    分析经历:我一直对Quasar及其轻质纤维替代Threads感到好奇。那么quasar本身是否有自己的纤程池呢?于是看是翻阅文档,找吧挨个看吧!

    线程池ThreadPoolExecutor类的实现。

    int maxThreadPoolSize = 10;
    
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            maxThreadPoolSize,
            maxThreadPoolSize,
            10, TimeUnit.MINUTES,
            new ArrayBlockingQueue(maxThreadPoolSize),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    
    for (int i = 0; i < 100; i++) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // run some code
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    上面的代码创建了一个具有10个线程的池,一个在池前的队列(该队列可以容纳10个元素)和一个拒绝策略(当队列已满时),以使主线程自己执行Runnable任务。当for循环创建100个可运行对象时,它们将在池中一次执行10个,排队等待10个,并且主线程自己拾取一个Runnable直到其他对象完成,然后主线程返回将Runnables添加到执行程序。

    每个光纤调度器调度的光纤,当您创建不带调度器的光纤时,将创建一个FiberForkJoinScheduler并将其分配给该光纤。

    简而言之,如果要管理线程池中的光纤,请使用FiberExecutorScheduler
    Quasar关于调度光纤的文档

    您的代码可能像这样

            //线程池任务数量
            int maxThreadPoolSize = 10;
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    maxThreadPoolSize,
                    maxThreadPoolSize,
                    10, TimeUnit.MINUTES,
                    new ArrayBlockingQueue(maxThreadPoolSize),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
            //重点!通过FiberExecutorScheduler类将线程池传为协程池
            FiberExecutorScheduler scheduler = new FiberExecutorScheduler("FibersInAPool", executor);
            for (int i = 0; i < 100; i++) {
                int finalI = i;
                Fiber fiber = new Fiber<>(scheduler
                        , new SuspendableRunnable() {
                    @Override
                    public void run() throws SuspendExecution, InterruptedException {
                        // run some code
                        System.out.println(finalI);
                        Thread.sleep(1000);
                    }
                });
                fiber.start();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这个操作看着可能会很奇怪,一个纤程池干嘛要利用线程池来加载使用?这可能与java底层本身不支持协程有关吧【猜】,不过通过这种形式也有好处就是可以直接通过开始的线程池的一些功能。方便了使用习惯(无非就是创建个线程池通过FiberExecutorScheduler来改为协程池进行协程操作),这样对于springboot的bean也会更友好的实现了吧!毕竟是通过线程池创建bean!

    以下是实现效果:

    不会出现阻塞警告了

    光纤非常便宜,因此您根本不需要池(及其异步作业调度模型):只需启动光纤,并在每次需要新的顺序进程与其他进程同时运行时让其运行常规顺序代码即可。

    当然它也支持类似go的管道【看文档】,可以自行开发你的业务逻辑。

    Channel (Quasar 0.8.0)

    package testgrp;
     
    import java.util.concurrent.ExecutionException;
     
    import co.paralleluniverse.strands.SuspendableCallable;
    import co.paralleluniverse.strands.SuspendableRunnable;
    import co.paralleluniverse.strands.channels.Channels;
    import co.paralleluniverse.strands.channels.IntChannel;
     
    import co.paralleluniverse.fibers.Fiber;
    import co.paralleluniverse.fibers.SuspendExecution;
     
    /**
     * Increasing-Echo Quasar Example
     *
     * @author circlespainter
     */
    public class QuasarIncreasingEchoApp {
        static public Integer doAll() throws ExecutionException, InterruptedException {
            final IntChannel increasingToEcho = Channels.newIntChannel(0); // Synchronizing channel (buffer = 0)
            final IntChannel echoToIncreasing = Channels.newIntChannel(0); // Synchronizing channel (buffer = 0)
     
            Fiber increasing = new Fiber<>("INCREASER", new SuspendableCallable() { @Override public Integer run() throws SuspendExecution, InterruptedException {
                // The following is enough to test instrumentation of synchronizing methods
                // synchronized(new Object()) {}
     
                int curr = 0;
                for (int i = 0; i < 10 ; i++) {
                    Fiber.sleep(10);
                    System.out.println("INCREASER sending: " + curr);
                    increasingToEcho.send(curr);
                    curr = echoToIncreasing.receive();
                    System.out.println("INCREASER received: " + curr);
                    curr++;
                    System.out.println("INCREASER now: " + curr);
                }
                System.out.println("INCREASER closing channel and exiting");
                increasingToEcho.close();
                return curr;
            } }).start();
     
            Fiber echo = new Fiber("ECHO", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException {
                Integer curr;
                while (true) {
                    Fiber.sleep(1000);
                    curr = increasingToEcho.receive();
                    System.out.println("ECHO received: " + curr);
     
                    if (curr != null) {
                        System.out.println("ECHO sending: " + curr);
                        echoToIncreasing.send(curr);
                    } else {
                        System.out.println("ECHO detected closed channel, closing and exiting");
                        echoToIncreasing.close();
                        return;
                    }
                }
            } }).start();
     
            try {
                increasing.join();
                echo.join();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
     
            return increasing.get();
        }
     
        static public void main(String[] args) throws ExecutionException, InterruptedException {
            doAll();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    我们大java真是无所不能,通过这次实践也是让我倍感骄傲!以后谁也别说什么java不支持协程了~

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    软件项目管理 6.10.成本预算
    Spring的国际化消息
    使用Microsoft.SemanticKernel基于本地运行的Ollama大语言模型实现Agent调用函数
    【普通人解题】leetcode 62. 不同路径
    scipy.optimize.leastsq()拟合函数
    MindSpore梯度进阶操作
    GCC详解的-Wl选项说明与测试
    数据结构和算法(10):B-树
    httpClient超时时间详解与测试案例
    【毕业设计源码】基于Python的校园生活助手(二手+活动+论坛+新闻)信息系统
  • 原文地址:https://blog.csdn.net/emgexgb_sef/article/details/126106231