• Android~RxJava实现newSingleThreadExecutor()同等效果



    背景

    最近有一个项目需求,考虑到性能问题,需要使用单线程顺序执行,java中自然是Executors.newSingleThreadExecutor()。但我想用rxjava实现一样的效果,本篇文章记录rxjava2实现newSingleThreadExecutor同样的效果。


    一、RxJava中的线程

    rxjava中的线程调度管理如下图,线程都是在SchedulerPoolFactory类的create中创建的。
    Schedule框架Rxjava 2.x版本调度器种类:

    • Schedulers.computation() : 用于cpu密集型计算任务,即不会被被I/O等操作限制性能的耗时操作,例如XML,JSON文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU核数,不可以用于IO操作,因为IO操作的等待时间会浪费cpu
    • Schedulers.from(@NonNull Excutor excutor): 指定一个线程调度器,由此调度器来控制任务的执行策略。
    • Schedulers.io():用于IO密集型的操作,例如写磁盘操作,查询数据库,网络访问,具有线程缓存机制,在此调度器接收到任务之后,先检查线程缓存池中是否有空闲的线程可用,如果有,复用,如果没有则 创建新的线程,并将其加入到线程池中,如果每次都没有空闲的线程使用,可以无上限的创建线程。
    • Schedulers.newThread(): 在每执行一个任务时创建一个新的线程,不具有线程缓存机制,由于创建一个线程比起复用一个线程更加耗时耗力,虽然使用Schedulers.io()的地方都可以使用Schedulers.newThread(),但是总体上的Schedulers.newThread()的效率没有Schedulers.io()的高。
    • Schedulers.trampoline():在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停下来,等插入进来的任务执行完成之后,再将未完成的任务接着执行。
    • Schedulers.single():拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务在执行的时候其他任务将按照队列先进先出的顺序依次执行
    • AndroidSchedulers.mainThread():在Andriod UI线程中执行任务,属于Android的专属定制。

    二、使用步骤

    1.源码

    SingleScheduler中的关键代码

        /**
         * Constructs a SingleScheduler with the given ThreadFactory and prepares the
         * single scheduler thread.
         * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
         *                      system properties for configuring new thread creation. Cannot be null.
         */
        public SingleScheduler(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            executor.lazySet(createExecutor(threadFactory));
        }
    
        static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
            return SchedulerPoolFactory.create(threadFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    ScheduledExecutorService中的关键代码

        /**
         * Creates a ScheduledExecutorService with the given factory.
         * @param factory the thread factory
         * @return the ScheduledExecutorService
         */
        public static ScheduledExecutorService create(ThreadFactory factory) {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            tryPutIntoPool(PURGE_ENABLED, exec);
            return exec;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以看到SingleScheduler最终实现还是通过newScheduledThreadPool实现的SingleScheduler这个类只会创建一次,所以使用Schedulers.single()时也只会创建一个线程。

    2.代码实现

        private void rxjavaSingleTest() {
            Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 3; i++) {
                            System.out.println("发射线程:" + Thread.currentThread().getName() + "---->" + "发射:" + i);
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                .subscribeOn(Schedulers.single())//设置可观察对象在Schedulers.single()的线程中发射数据
                .observeOn(Schedulers.single())//指定map操作符在Schedulers.single()的线程中处理数据
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer i) throws Exception {
                        System.out.println("处理线程:" + Thread.currentThread().getName() + "---->" + "处理:" + i);
                        return i;
                    }
                })
                .observeOn(Schedulers.single())//设置观察者在Schedulers.single()的线程中j接收数据
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer i) throws Exception {
                        System.out.println("接收线程:" + Thread.currentThread().getName() + "---->" + "接收:" + i);
                    }
                }).isDisposed();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    Log打印:

    15:33:25.949 I 发射线程:RxSingleScheduler-1---->发射:0
    15:33:25.949 I 发射线程:RxSingleScheduler-1---->发射:1
    15:33:25.949 I 发射线程:RxSingleScheduler-1---->发射:2
    15:33:25.950 I 处理线程:RxSingleScheduler-1---->处理:0
    15:33:25.950 I 处理线程:RxSingleScheduler-1---->处理:1
    15:33:25.950 I 处理线程:RxSingleScheduler-1---->处理:2
    15:33:25.950 I 接收线程:RxSingleScheduler-1---->接收:0
    15:33:25.952 I 接收线程:RxSingleScheduler-1---->接收:1
    15:33:25.953 I 接收线程:RxSingleScheduler-1---->接收:2
    15:33:26.116 I 发射线程:RxSingleScheduler-1---->发射:0
    15:33:26.116 I 发射线程:RxSingleScheduler-1---->发射:1
    15:33:26.118 I 发射线程:RxSingleScheduler-1---->发射:2
    15:33:26.118 I 处理线程:RxSingleScheduler-1---->处理:0
    15:33:26.118 I 处理线程:RxSingleScheduler-1---->处理:1
    15:33:26.118 I 处理线程:RxSingleScheduler-1---->处理:2
    15:33:26.119 I 接收线程:RxSingleScheduler-1---->接收:0
    15:33:26.119 I 接收线程:RxSingleScheduler-1---->接收:1
    15:33:26.120 I 接收线程:RxSingleScheduler-1---->接收:2

    连续调用rxjavaSingleTest多次,通过Log可以看到线程是顺序执行的且共用一个叫RxSingleScheduler的线程。

    总结

    Rxjava使用起来还是蛮方便的,不用担心调用Observable create、subscribeOn后会创建新的线程,它会复用同一线程的。也不用像java自带的线程池那样去复用提交任务,大大简化编码方式。

    参考:

  • 相关阅读:
    数字马力面试题
    驱动开发:内核封装TDI网络通信接口
    【LeetCode热题100】--53.最大子数组和
    【JavaSE】异常处理
    在PowerBI中提取IFC文件中的数据
    聚观早报 | 极越07正式上市;宝骏云海正式上市
    2023山东健博会/营养健康展/特医食品展/山东大健康展
    关于C++拷贝控制
    [原创]程序员团队如何管理不听话的下属?史上最狠解决方案来了!
    5.5线程同步机制类封装及线程池实现
  • 原文地址:https://blog.csdn.net/Bluechalk/article/details/127599881