- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(@NonNull ObservableEmitter
emitter) throws Throwable { -
- }
- }).map(new Function
() { - @Override
- public Integer apply(String s) throws Throwable {
- return null;
- }
- }).subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull Integer character) {
-
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
流程分析:

对于被观察者来说,装饰器模式体现在subscribeActual方法
对于观察者来说,装饰器模式体现在onNext方法
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(@NonNull ObservableEmitter
emitter) throws Throwable { - emitter.onNext("A");
- emitter.onComplete();
- }
- }).subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull String s) {
- System.out.println(s);
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
这段代码,主要就是三块儿,第一块儿是被观察者Observable,第二块是观察者Observer,第三块是订阅观察者和被观察者。
我们要通过源码把订阅的细节和如何使用emiter发射到observer的。
- public static <@NonNull T> Observable
create(@NonNull ObservableOnSubscribe source) { - Objects.requireNonNull(source, "source is null");
- return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
- }
首先这个Observable.create()方法的返回值还是Observable,参数是ObservableOnSubscribe
- public interface ObservableOnSubscribe<@NonNull T> {
-
- /**
- * Called for each {@link Observer} that subscribes.
- * @param emitter the safe emitter instance, never {@code null}
- * @throws Throwable on error
- */
- void subscribe(@NonNull ObservableEmitter
emitter) throws Throwable; - }
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
首先看RxJavaPlugins.onAssembly()什么作用:
- public static <@NonNull T> Observable
onAssembly(@NonNull Observable source) { - Function super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
这个其实是一个预留的可以插入的hook节点,默认情况下这个onObservableAssembly是null,所以直接返回的是new ObservableCreate<>(source),只有在设置了:
- RxJavaPlugins.setOnObservableAssembly(new Function
() { - @Override
- public Observable apply(Observable observable) throws Throwable {
- System.out.println("监控日志");
- return observable;
- }
- });
-
- public static void setOnObservableAssembly(@Nullable Function super Observable, ? extends Observable> onObservableAssembly) {
- if (lockdown) {
- throw new IllegalStateException("Plugins can't be changed anymore");
- }
- RxJavaPlugins.onObservableAssembly = onObservableAssembly;
- }
设置了setOnObservableAssembly之后onObservableAssembly就不为null了,这时每次执行Observable.create()方法,都会调用apply方法,这样就可以在apply方法中添加一些监控的日志等,所以说这是一个预留的hook节点,只有设置之后才会有效,如果不设置相当于直接返回new ObservableCreate<>(source),所以Observable.create()方法最后的返回值是ObservableCreate是一个被观察者Observable。
Observable.create()的参数ObservableOnSubscribe,作为ObservableCreate<>(source)的参数,维护在了被观察者ObservableCreate中。
然后我们看,具体如何订阅的:
首先订阅操作是Observable.subscribe(observer),所以我们看Observable的subscribe方法:
- public final void subscribe(@NonNull Observer super T> observer) {
- Objects.requireNonNull(observer, "observer is null");
- try {
- observer = RxJavaPlugins.onSubscribe(this, observer);
-
- Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
-
- subscribeActual(observer);
- } catch (NullPointerException e) { // NOPMD
- throw e;
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- // can't call onError because no way to know if a Disposable has been set or not
- // can't call onSubscribe because the call might have set a Subscription already
- RxJavaPlugins.onError(e);
-
- NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
- npe.initCause(e);
- throw npe;
- }
- }
可以看到subscribe方法主要就是调用了subscribeActual方法,可是subscribeActual是一个抽象方法,我们知道这一定是在子类中有具体的实现,我们知道Observable的实现类是ObservableCreate,所以我们看ObservableCreate的subscribeActual方法:
- protected void subscribeActual(Observer super T> observer) {
- CreateEmitter
parent = new CreateEmitter<>(observer); - observer.onSubscribe(parent);
-
- try {
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
首先创建了CreateEmitter,并且把观察者observer当作参数维护在CreateEmitter中,然后调用观察者observer的onSubscribe方法,这就是为啥observer的onSubscribe方法最先执行,因为一执行subscribe订阅方法,就会调用到observer的onSubscribe方法。
订阅重点到了:
source.subscribe(parent);
source就是Observable.create()的参数ObservableOnSubscribe,parent是CreateEmitter,CreateEmitter持有observer,通过source.subscribe(parent);之后ObservableOnSubscribe持有了CreateEmitter,这个source.subscribe方法的作用是回调,这也是事件发射最起始的地方。
观察者模式可以理解成被观察者持有观察者的引用,那总结一下上面的引用的持有链,观察者observer被CreateEmitter持有,CreateEmitter被ObservableOnSubscribe持有 ,ObservableOnSubscribe被被观察者ObservableCreate持有,并以回调的方式使用CreateEmitter,所以当我们在Observable.create()的参数ObservableOnSubscribe中,可以操作CreateEmitter来发送事件,最终到oberver。

- public final <@NonNull R> Observable
map(@NonNull Function super T, ? extends R> mapper) { - Objects.requireNonNull(mapper, "mapper is null");
- return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
- }
-
- public final class ObservableMap
extends AbstractObservableWithUpstream - {
-
- final Function super T, ? extends U> function;
-
- public ObservableMap(ObservableSource
source, Function super T, ? extends U> function) { - super(source);
- this.function = function;
- }
-
- public void subscribeActual(Observer super U> t) {
- source.subscribe(new MapObserver
(t, function)); - }
-
-
-
- }
首先source是在map方法中的this,这个this就是上一个Observable,可以看到ObservableMap是用source持有了上一个Observable,且在subscribeActual方法中调用了source.subscribe(new MapObserver
所以我们总结出:
对于被观察者来说,装饰器模式体现在subscribeActual方法
对于观察者来说,装饰器模式体现在onNext方法
装饰器模式对于观察者链的作用,首先观察者和被观察者的关系是一一对应的,终点的观察者被最近的被观察者订阅,并且在被观察者的装饰器方法subscribeActual中,会创建一个新的Observer,让上一个被观察者和新创建的观察者订阅,新创建的观察者持有终点观察者,这样就能把整个观察链打通。
dispose方法:
- public void dispose() {
- DisposableHelper.dispose(this);
- }
DisposableHelper 的 dispose() 方法的定义如下。按照上面的分析,dispose() 的时候传入的 this 就是 CreateEmitter. 并且它是继承了 AtomicReference 的。
- public static boolean dispose(AtomicReference
field) { - Disposable current = field.get();
- Disposable d = DISPOSED;
- if (current != d) {
- current = field.getAndSet(d);
- if (current != d) {
- if (current != null) {
- current.dispose();
- }
- return true;
- }
- }
- return false;
- }
对 AtomicReference,相比大家都不陌生,它是一个原子类型的引用。这里正式通过对该原子类型引用的赋值来完成取消订阅的——通过一个原子操作将其设置为 DISPOSED.
这个DisposableHelper是什么:
- public enum DisposableHelper implements Disposable {
- /**
- * The singleton instance representing a terminal, disposed state, don't leak it.
- */
- DISPOSED
- ;
它是一个枚举类型,只有一个值是DISPOSED,同时它也实现了Disposable接口。
看一个DisposableHelper的方法:
- public static boolean isDisposed(Disposable d) {
- return d == DISPOSED;
- }
-
- public void onNext(T t) {
- if (t == null) {
- onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
- return;
- }
- if (!isDisposed()) {
- observer.onNext(t);
- }
- }
从这就可以看出来,当把Disposable赋值为DISPOSED时,就说明上下游已经断开了,不让发送了
- public static Scheduler io() {
- return RxJavaPlugins.onIoScheduler(IO);
- }
-
- //🌟这个IO就是一个Scheduler
- static final Scheduler IO;
-
- IO = RxJavaPlugins.initIoScheduler(new IOTask());
-
- static final class IOTask implements Supplier
{ - @Override
- public Scheduler get() {
- return IoHolder.DEFAULT;
- }
- }
-
- static final class IoHolder {
- static final Scheduler DEFAULT = new IoScheduler();
- }
从这个调用链下来,我们知道了,这个Schedulers.io()是一个Scheduler,它的实现类是一个IoScheduler
调用了Schedulers.io()会创建IoScheduler对象,肯定会执行IoScheduler的构造方法,我们看一下这个IoScheduler的构造方法
- public IoScheduler() {
- this(WORKER_THREAD_FACTORY);
- }
-
- public IoScheduler(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- this.pool = new AtomicReference<>(NONE);
- start();
- }
-
- public void start() {
- CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
- if (!pool.compareAndSet(NONE, update)) {
- update.shutdown();
- }
- }
我们看到通过this.pool可以获取到CachedWorkerPool,重点是这个CachedWorkerPool是啥?
- static final class CachedWorkerPool implements Runnable {
-
- private final ScheduledExecutorService evictorService;
- private final Future> evictorTask;
-
- CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
- this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
- this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
- this.allWorkers = new CompositeDisposable();
- this.threadFactory = threadFactory;
-
- ScheduledExecutorService evictor = null;
- Future> task = null;
- if (unit != null) {
- evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
- task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
- }
- evictorService = evictor;
- evictorTask = task;
- }
-
-
- }
可以看出CachedWorkerPool实现了Runnable接口,同时它内部维护了线程池。
我们可以总结了Schedulers.io()创建了一个IoScheduler对象,这个对象内部维护了一个线程池。
- public final Observable
subscribeOn(@NonNull Scheduler scheduler) { - Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
- }
和其他的map等方法一样,subscribeOn()也是创建一个实现类为ObservableSubscribeOn的Observable,这个ObservableSubscribeOn中持有链式调用中的上一个Observable,同时在装饰器的主要方法subscribeActual内部实现链式订阅,new了一个实现类为new SubscribeOnObserver<>(observer)的Observer,然后把上一个Observable和这个SubscribeOnObserver订阅,我们总结是链式调用的观察者模式中,总是上一个被观察者Observable订阅下一个Observer观察者,因为链式调用中开头一定是先有被观察者Observable。
这个SubscribeOnObserver持有下一个Observer的引用,这也是装饰器模式的运用,装饰器的主要作用方法是onNext,通过onNext方法不断的调用持有的下一个observer的onNext来达到链式调用的效果,其实订阅的本质就是要被观察者Observable持有观察者Observer,又因为链式调用开始一定是被观察者,所以一定是上一个被观察者被下一个观察者订阅,而notify下发的本质就是被观察者发出notify动作,然后通过onNext方法通知它的观察者,观察者的onNext方法中先执行参数方法,参数方法就是例如Map的参数方法就是转换函数new Function
我们已知Observable的订阅方法是subscribeActual,下面我们继续看这个ObservableSubscribeOn的subscribeActual方法干了什么:
- final Scheduler scheduler;
-
- public ObservableSubscribeOn(ObservableSource
source, Scheduler scheduler) { - super(source);
- this.scheduler = scheduler;
- }
-
- @Override
- public void subscribeActual(final Observer super T> observer) {
- final SubscribeOnObserver
parent = new SubscribeOnObserver<>(observer); -
- observer.onSubscribe(parent);
-
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
- }
首先这个scheduler就是IoScheduler,主要就是这一句
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
中的
scheduler.scheduleDirect(new SubscribeTask(parent))
先看 new SubscribeTask(parent)):
- final class SubscribeTask implements Runnable {
- private final SubscribeOnObserver
parent; -
- SubscribeTask(SubscribeOnObserver
parent) { - this.parent = parent;
- }
-
- @Override
- public void run() {
- source.subscribe(parent);
- }
- }
很简单是一个Runnable,run方法中的source就是上一个Observable,和SubscribeOnObserver进行了订阅,这意味着如果这个Runnable在其他线程中执行,则从执行这个run方法开始,后面所有的代码包括被观察者调用观察者的observer的onNext方法并且传递到终点的观察者的onNext方法都会执行在这个线程中。
首先我们知道订阅是一个从下往上的过程,线程的转换是发生在订阅时,所以如果是:
- ObservableA
- .subscribeOn(Schedulers.A())
- ObervableB
- .subscribeOn(Schedulers.B())
- ObervableC
- .subscribeOn(Schedulers.C())
那它最终会执行在线程A中,这也是我们常说的第一个subscribeOn的线程有效。
那如何把这个Runnable对象交给线程执行的呢?
那就要看我们的scheduler.scheduleDirect方法:
- public Disposable scheduleDirect(@NonNull Runnable run) {
- return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
- }
-
- public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
- final Worker w = createWorker();
-
- final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
-
- DisposeTask task = new DisposeTask(decoratedRun, w);
-
- w.schedule(task, delay, unit);
-
- return task;
- }
我们已经知道scheduler的实现类是IoScheduler,我们一句一句分析scheduleDirect方法
final Worker w = createWorker();
对应 IoScheduler中的:
- public Worker createWorker() {
- return new EventLoopWorker(pool.get());
- }
我们已经知道pool.get()就是获取到能操作线程池的CachedWorkerPool类,而这个EventLoopWorker是用来管理操作线程池的类:
- static final class EventLoopWorker extends Scheduler.Worker implements Runnable {
- private final CompositeDisposable tasks;
- private final CachedWorkerPool pool;
- private final ThreadWorker threadWorker;
-
- final AtomicBoolean once = new AtomicBoolean();
-
- EventLoopWorker(CachedWorkerPool pool) {
- this.pool = pool;
- this.tasks = new CompositeDisposable();
- this.threadWorker = pool.get();
- }
-
- @Override
- public void dispose() {
- if (once.compareAndSet(false, true)) {
- tasks.dispose();
-
- if (USE_SCHEDULED_RELEASE) {
- threadWorker.scheduleActual(this, 0, TimeUnit.NANOSECONDS, null);
- } else {
- // releasing the pool should be the last action
- pool.release(threadWorker);
- }
- }
- }
-
- @Override
- public void run() {
- pool.release(threadWorker);
- }
-
- @Override
- public boolean isDisposed() {
- return once.get();
- }
-
- @NonNull
- @Override
- public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
- if (tasks.isDisposed()) {
- // don't schedule, we are unsubscribed
- return EmptyDisposable.INSTANCE;
- }
-
- return threadWorker.scheduleActual(action, delayTime, unit, tasks);
- }
- }
我们看第二句:
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
run是SubscribeTask也就是我们线程切换的runnable,RxJavaPlugins.onSchedule就是RxJava常用的Hook预留,它和RxJavaPlugins.onAssembly作用一样,就是给外部一个能插入执行此代码的预留,默认还是返回原Runnable。
DisposeTask task = new DisposeTask(decoratedRun, w); ...... return task;
把runnable包装成实现Disposable和Runnable接口的包装类,让他既能中断又能给线程执行,看最后会把task返回,其实主要是为了能中断。
看第三句:
w.schedule(task, delay, unit);
这句对应IoScheduler中的EventLoopWorker的schedule方法:
- public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
- if (tasks.isDisposed()) {
- // don't schedule, we are unsubscribed
- return EmptyDisposable.INSTANCE;
- }
-
- return threadWorker.scheduleActual(action, delayTime, unit, tasks);
- }
-
-
- public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
- Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
-
- ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
-
- if (parent != null) {
- if (!parent.add(sr)) {
- return sr;
- }
- }
-
- Future> f;
- try {
- if (delayTime <= 0) {
- f = executor.submit((Callable
- } else {
- f = executor.schedule((Callable
- }
- sr.setFuture(f);
- } catch (RejectedExecutionException ex) {
- if (parent != null) {
- parent.remove(sr);
- }
- RxJavaPlugins.onError(ex);
- }
-
- return sr;
- }
可以看到它实际上就是把runnable转换成Callable之后交给线程池执行了。
到这里我们知道了subscribeOn(Schedulers.io())切换线程就是把订阅操作放到了runnable中,并且交给了线程池执行。
- public static Scheduler mainThread() {
- return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
- }
-
- private static final Scheduler MAIN_THREAD =
- RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
-
- private static final class MainHolder {
- static final Scheduler DEFAULT
- = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
- }
通过调用链,我们可以知道AndroidSchedulers.mainThread(),最后创建了一个和IoScheduler类似的HandlerScheduler,并且HandlerScheduler持有mainThread的Handler,这下我们应该能想到,肯定是要把任务交给Handler去执行,这样就能实现执行在主线程中,具体细节下面分析。
- public final Observable
observeOn(@NonNull Scheduler scheduler) { - return observeOn(scheduler, false, bufferSize());
- }
-
- public final Observable
observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { - Objects.requireNonNull(scheduler, "scheduler is null");
- ObjectHelper.verifyPositive(bufferSize, "bufferSize");
- return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
- }
主要就是这个ObservableObserveOn,它其实就是一个Observable,Observable订阅的时候会执行subscribeActual方法,所以我们重点看这个方法:
- protected void subscribeActual(Observer super T> observer) {
- ......
- Scheduler.Worker w = scheduler.createWorker();
-
- source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
- }
- }
这个scheduler很明显是HandlerScheduler,scheduler.createWorker的代码:
- public Worker createWorker() {
- return new HandlerWorker(handler, async);
- }
source是上一个Observable,要订阅新创建的Observer即ObserveOnObserver,ObserveOnObserver的装饰器主要方法是onNext,下面我们主要看ObserveOnObserver的onNext方法:
- public void onNext(T t) {
- if (done) {
- return;
- }
- if (sourceMode != QueueDisposable.ASYNC) {
- queue.offer(t);
- }
- schedule();
- }
-
- void schedule() {
- if (getAndIncrement() == 0) {
- worker.schedule(this);
- }
- }
这个worker是HandlerWorker,这个this就是本身实现了Runnable接口,这个runnable之后会交给handler去执行,真正执行的就是ObserveOnObserver的run方法,我们结下来看HandlerWorker的schedule方法:
- public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
- if (run == null) throw new NullPointerException("run == null");
- if (unit == null) throw new NullPointerException("unit == null");
-
- if (disposed) {
- return Disposable.disposed();
- }
-
- run = RxJavaPlugins.onSchedule(run);
-
- ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
-
- Message message = Message.obtain(handler, scheduled);
- message.obj = this; // Used as token for batch disposal of this worker's runnables.
-
- if (async) {
- message.setAsynchronous(true);
- }
-
- handler.sendMessageDelayed(message, unit.toMillis(delay));
-
- // Re-check disposed state for removing in case we were racing a call to dispose().
- if (disposed) {
- handler.removeCallbacks(scheduled);
- return Disposable.disposed();
- }
-
- return scheduled;
- }
主要是这几句:
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
从这里可以看出来,就是把任务交给了主线程的Handler去执行,达到的让任务执行在主线程的效果,当主线程执行时会执行ObserveOnObserver的run方法:
- @Override
- public void run() {
- if (outputFused) {
- drainFused();
- } else {
- drainNormal();
- }
- }
-
- void drainFused() {
- ......
-
- downstream.onNext(null);
-
- ......
- }
-
- void drainNormal() {
- ......
- final Observer super T> a = downstream;
-
- ......
-
- a.onNext(v);
- }
-
- ......
- }
不难看出,主线程的handler执行的代码,就是调用下游的observer的onNext方法,这样就能保证从observeOn(AndroidSchedulers.mainThread())执行后,下面的observer的onNext方法都是执行在主线程中。
总结一下, observeOn()的作用就是通过主线程的Handler,调用的下一个Observer的onNext方法,这样就能实现下一个的observer的onNext方法的调用是主线程。
当出现异步请求时,如果上游请求的速度远远超过下游处理的速度,就会出现事件积压在队列中,积压过多就会出现OOM。
如何解决呢?
为了解决这个问题设计了:下游提出自己的需求,上游按照下游的需求发出事件,如果上游超过了下游提出的请求,就会按照5种策略来处理。
BaseEmitter继承AtomicLong,保证事件增减的原子性- abstract static class BaseEmitter
- extends AtomicLong
- implements FlowableEmitter
, Subscription { - }
-
- @Override
- public final void request(long n) {
- if (SubscriptionHelper.validate(n)) {
- BackpressureHelper.add(this, n);
- onRequested();
- }
- }
-
- public static long add(AtomicLong requested, long n) {
- for (;;) {
- long r = requested.get();
- if (r == Long.MAX_VALUE) {
- return Long.MAX_VALUE;
- }
- long u = addCap(r, n);
- if (requested.compareAndSet(r, u)) {
- return r;
- }
- }
- }
发生器XXXEmitter通过request方法记录下游可以处理事件的个数,上游通过requtsted方法获取那个记录的个数,本质就是XXXEmitter是AtomicLong。
request和requested使用:
如观察者可接收事件数量 = 1,当被观察者发送第2个事件时,就会抛出异常
- Flowable.create(new FlowableOnSubscribe
() { - @Override
- public void subscribe(FlowableEmitter
emitter) throws Exception { -
- // 1. 调用emitter.requested()获取当前观察者需要接收的事件数量
- Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());
-
- // 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件
- // 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个
- Log.d(TAG, "发送了事件 1");
- emitter.onNext(1);
- Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());
-
- Log.d(TAG, "发送了事件 2");
- emitter.onNext(2);
- Log.d(TAG, "发送事件2后, 还需要发送事件数量 = " + emitter.requested());
-
- emitter.onComplete();
- }
- }, BackpressureStrategy.ERROR)
- .subscribe(new Subscriber
() { - @Override
- public void onSubscribe(Subscription s) {
-
- Log.d(TAG, "onSubscribe");
- s.request(1); // 设置观察者每次能接受1个事件
-
- }
-
- @Override
- public void onNext(Integer integer) {
- Log.d(TAG, "接收到了事件" + integer);
- }
-
- @Override
- public void onError(Throwable t) {
- Log.w(TAG, "onError: ", t);
- }
-
- @Override
- public void onComplete() {
- Log.d(TAG, "onComplete");
- }
- });
当FlowableEmitter.requested()减到0时,则代表观察者已经不可接收事件,这时就由策略决定如何处理多出的事件。
- Flowable.create(new FlowableOnSubscribe
() { - @Override
- public void subscribe(FlowableEmitter
emitter) throws Exception { -
- // 1. 调用emitter.requested()获取当前观察者需要接收的事件数量
- Log.d(TAG, "观察者可接收事件数量 = " + emitter.requested());
-
- // 2. 每次发送事件后,emitter.requested()会实时更新观察者能接受的事件
- // 即一开始观察者要接收10个事件,发送了1个后,会实时更新为9个
-
- for (int i = 0; i < 129; i++) {
- Thread.sleep(100);
- Log.d(TAG, "发送了事件 " + i);
- emitter.onNext(i);
- Log.d(TAG, "发送了事件1后, 还需要发送事件数量 = " + emitter.requested());
- }
- emitter.onComplete();
- }
- }, BackpressureStrategy.ERROR)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Subscriber
() { - @Override
- public void onSubscribe(Subscription s) {
-
- Log.d(TAG, "onSubscribe");
- s.request(1);
-
- }
-
- @Override
- public void onNext(Integer integer) {
- Log.i(TAG, "接收到了事件" + integer);
- }
-
- @Override
- public void onError(Throwable t) {
- Log.w(TAG, "onError: ", t);
- }
-
- @Override
- public void onComplete() {
- Log.d(TAG, "onComplete");
- }
- });
是异步时,会有一个事件缓存池,默认是128,当出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时,就会触发策略处理
- abstract static class NoOverflowBaseAsyncEmitter
extends BaseEmitter { -
- private static final long serialVersionUID = 4127754106204442833L;
-
- NoOverflowBaseAsyncEmitter(Subscriber super T> actual) {
- super(actual);
- }
-
- @Override
- public final void onNext(T t) {
- if (isCancelled()) {
- return;
- }
-
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
-
- if (get() != 0) {
- actual.onNext(t);
- BackpressureHelper.produced(this, 1);
- } else {
- onOverflow();
- }
- }
-
- abstract void onOverflow();
- }
我们可以看到NoOverflowBaseAsyncEmitter继承于BaseEmitter,同时还实现了onNext方法。从这里,我们可以看出AtomicLong的作用了,当调用一次Emitter的onNext方法时,先判断当前值是否为0,如果不为0,那么调用Subscriber的onNext方法,同时使用BackpressureHelper使当前的值减一;如果当前的值已经为0了,会调用onOverflow方法,此时根据不同的策略,onOverflow方法实现就不尽相同。
注意如果不调用reqeust方法,默认为0.
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:直接抛出异常MissingBackpressureException
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:友好提示:缓存区满了
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:将缓存区大小设置成无限大
即被观察者可无限发送事件观察者,但实际上是存放在缓存区
但要注意内存情况,防止出现OOM
可以接收超过原先缓存区大小(128)的事件数量了
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:超过缓存区大小(128)的事件丢弃
如发送了150个事件,仅保存第1 - 第128个事件,第129 -第150事件将被丢弃
被观察者一下子发送了150个事件,点击按钮接收时观察者接收了128个事件;再次点击接收时却无法接受事件,这说明超过缓存区大小的事件被丢弃了。
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃
即如果发送了150个事件,缓存区里会保存129个事件(第1-第128 + 第150事件)
被观察者一下子发送了150个事件,点击按钮接收时观察者接收了128个事件;
再次点击接收时却接收到1个事件(第150个事件),这说明超过缓存区大小的事件仅保留最后的事件(第150个事件)