最近有一个项目需求,考虑到性能问题,需要使用单线程顺序执行,java中自然是Executors.newSingleThreadExecutor()。但我想用rxjava实现一样的效果,本篇文章记录rxjava2实现newSingleThreadExecutor同样的效果。
rxjava中的线程调度管理如下图,线程都是在SchedulerPoolFactory类的create中创建的。
Rxjava 2.x版本调度器种类:
SingleScheduler中的关键代码
/**
* Constructs a SingleScheduler with the given ThreadFactory and prepares the
* single scheduler thread.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}
ScheduledExecutorService中的关键代码
/**
* Creates a ScheduledExecutorService with the given factory.
* @param factory the thread factory
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
可以看到SingleScheduler最终实现还是通过newScheduledThreadPool实现的SingleScheduler这个类只会创建一次,所以使用Schedulers.single()时也只会创建一个线程。
private void rxjavaSingleTest() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 3; i++) {
System.out.println("发射线程:" + Thread.currentThread().getName() + "---->" + "发射:" + i);
e.onNext(i);
}
e.onComplete();
}
})
.subscribeOn(Schedulers.single())//设置可观察对象在Schedulers.single()的线程中发射数据
.observeOn(Schedulers.single())//指定map操作符在Schedulers.single()的线程中处理数据
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer i) throws Exception {
System.out.println("处理线程:" + Thread.currentThread().getName() + "---->" + "处理:" + i);
return i;
}
})
.observeOn(Schedulers.single())//设置观察者在Schedulers.single()的线程中j接收数据
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("接收线程:" + Thread.currentThread().getName() + "---->" + "接收:" + i);
}
}).isDisposed();
}
Log打印:
15:33:25.949 I 发射线程:RxSingleScheduler-1---->发射:0
15:33:25.949 I 发射线程:RxSingleScheduler-1---->发射:1
15:33:25.949 I 发射线程:RxSingleScheduler-1---->发射:2
15:33:25.950 I 处理线程:RxSingleScheduler-1---->处理:0
15:33:25.950 I 处理线程:RxSingleScheduler-1---->处理:1
15:33:25.950 I 处理线程:RxSingleScheduler-1---->处理:2
15:33:25.950 I 接收线程:RxSingleScheduler-1---->接收:0
15:33:25.952 I 接收线程:RxSingleScheduler-1---->接收:1
15:33:25.953 I 接收线程:RxSingleScheduler-1---->接收:2
15:33:26.116 I 发射线程:RxSingleScheduler-1---->发射:0
15:33:26.116 I 发射线程:RxSingleScheduler-1---->发射:1
15:33:26.118 I 发射线程:RxSingleScheduler-1---->发射:2
15:33:26.118 I 处理线程:RxSingleScheduler-1---->处理:0
15:33:26.118 I 处理线程:RxSingleScheduler-1---->处理:1
15:33:26.118 I 处理线程:RxSingleScheduler-1---->处理:2
15:33:26.119 I 接收线程:RxSingleScheduler-1---->接收:0
15:33:26.119 I 接收线程:RxSingleScheduler-1---->接收:1
15:33:26.120 I 接收线程:RxSingleScheduler-1---->接收:2
连续调用rxjavaSingleTest多次,通过Log可以看到线程是顺序执行的且共用一个叫RxSingleScheduler的线程。
Rxjava使用起来还是蛮方便的,不用担心调用Observable create、subscribeOn后会创建新的线程,它会复用同一线程的。也不用像java自带的线程池那样去复用提交任务,大大简化编码方式。
参考: