

起点(分发事件(Path): 我饿了)--下楼--去餐厅---点餐--->终点(吃饭消费事件)
程序中的例子
起点(分发事件,点击登录)---登录API---请求服务器---获取响应码---->终点(更新UI登录成功 消费事件)
有一个起点 和 一个终点, 起点开始流向我们的"事件" 把事件流向到终点,只不过在流向的过程中,可以增加拦截,
拦截时可以对"事件进行改变", 终点只关心它上一个拦截;
---------------------------------------------------------------
Observable 根据上层变化而变化过滤 Observer
被观察者 订阅(上层区域 和 下层区域 关联) 观察者
起点 终点
---------------------------------------------------------------
标准观察者: RxJava的观察者模式:


- // 总数据Bean
- public class ProjectBean {
- private int errorCode;
- private String errorMsg;
- private List
data; -
-
- public static class DataBean {
- private int courseId;
- private int id;
- private String name;
- private int order;
- private int parentChapterId;
- private boolean userControlSetTop;
- private int visible;
- private List> children;
- }
- }
-
-
- // Item数据
- public class ProjectItem {
- private DataBean data;
- private int errorCode;
- private String errorMsg;
- public static class DataBean {
- private int curPage;
- private int offset;
- private boolean over;
- private int pageCount;
- private int size;
- private int total;
- private List
datas; - public static class DatasBean {
-
-
- private String apkLink;
- private String desc;
- private String envelopePic;
- private boolean fresh;
- private int id;
- private String link;
- private String niceDate;
- private String origin;
- private String prefix;
- private String projectLink;
- private List
tags; -
-
- public static class TagsBean {
- private String name; // 项目
- private String url; ///project/list/1?cid=294
- }
- }
- }
- }
- public interface WanAndroidApi {
- //总数据
- @GET("project/tree/json")
- Observable
getProject(); //异步线程耗时操作 -
-
- //Item数据
- @GET("project/list/{pageIndex}/json") //?cid=294 异步线程 耗时操作
- Observable
getProjectItem(@Path("pageIndex") int pageIndex, @Query("cid") int cid); - }
-
-
- public class UseActivity extends AppCompatActivity {
- private WanAndroidApi api;
- public static final String TAG = "rxJava";
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_use);
- api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
- antiShakeAction();
- }
-
-
- //TODO Retrofit+RxJava 查询 项目分类 (总数据查询)
- Disposable disposable;
- public void getProjectAction(View view) {
- disposable = api.getProject()
- .subscribeOn(Schedulers.io()) //给上面分配异步线程
- .observeOn(AndroidSchedulers.mainThread())//给下面主线程
- .subscribe(new Consumer<ProjectBean>() {//简化版
- @Override
- public void accept(ProjectBean projectBean) throws Exception {
- Log.d(TAG, "accept: " + projectBean.toString());//可以做UI操作
- }
- });
- }
-
-
- //TODO Retrofit+RxJava 查询 项目分类的 去获取项目列表数据
- public void getProjectListAction(View view) {
- disposable = api.getProjectItem(1, 294)
- .compose(rxUD())
- .subscribe(projectItem -> {
- Log.d(TAG, "accept: " + projectItem.toString());//可以做UI操作
- });
- }
-
-
- /**
- * 封装我们线程调度的操作
- * UD : upstream 上游 , downstream 下游
- */
- public static <UD> ObservableTransformer<UD, UD> rxUD() {
- return upstream -> {
- return upstream.subscribeOn(Schedulers.io()) //给上面代码(subscribeOn)分配异步线程
- .observeOn(AndroidSchedulers.mainThread()); //给下面代码(observeOn)分配主线程
- };
- }
-
-
- @Override
- protected void onDestroy() {
- super.onDestroy();
- if (disposable != null && !disposable.isDisposed()) {
- disposable.dispose();
- }
- }
-
-
- public class UseActivity extends AppCompatActivity {
- private WanAndroidApi api;
- public static final String TAG = "rxJava";
-
-
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_use);
- api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
- antiShakeAction();
- }
-
-
-
-
- //TODO Retrofit+RxJava 查询 项目分类 (总数据查询)
- Disposable disposable;
-
-
- /**
- * TODO 自定义 功能防抖
- */
- private void antiShakeAction() {
- //对Button防抖动
- Button btn = findViewById(R.id.bt_anti_shake);
- disposable = RxView.clicks(btn)
- .throttleFirst(2000, TimeUnit.MILLISECONDS) //两秒内 响应一次
- .subscribe(new Consumer<Object>() {
- @Override
- public void accept(Object o) throws Exception {
- Log.d(TAG, "accept: 我响应了一次");
- }
- });
- }
-
-
- //TODO 防抖功能 + 网络嵌套 (解决网络嵌套问题) flatMap
- private void antiShakeActionUpdate() {
- //对Button防抖动
- Button btn = findViewById(R.id.bt_anti_shake);
- disposable = RxView.clicks(btn)//使用RxView来进行防抖
- .throttleFirst(2000, TimeUnit.MILLISECONDS) //两秒内 响应一次
- //我只给下面切换 异步线程
- .observeOn(Schedulers.io())
- //使用flatMap 自己分发 10个数据 onNext(1)-> 给下面 1-->多分发 10个数据
- //将Object 转换成 ObservableSource
传递给下一层 - .flatMap(new Function<Object, ObservableSource<ProjectBean>>() {
- @Override
- public ObservableSource<ProjectBean> apply(Object o) throws Exception {
- return api.getProject(); //返回 Observable
- }
- })
- //将 ProjectBean 转换为 ObservableSource
传递给下一层 - .flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {
- @Override
- public ObservableSource<ProjectBean.DataBean> apply(@NonNull ProjectBean projectBean) {
- return Observable.fromIterable(projectBean.getData());//自己搞一个发射器 发多次;;
- }
- })
- // 将ProjectBean.DataBean 转换成 ObservableSource
传递给下一层 - .flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() {
- @Override
- public ObservableSource<ProjectItem> apply(@NonNull ProjectBean.DataBean dataBean) {
- return api.getProjectItem(1, dataBean.getId()); //获取Item数据
- }
- })
- .observeOn(AndroidSchedulers.mainThread()) //给下面使用的线程
- .subscribe(new Consumer<ProjectItem>() {
- @Override
- public void accept(ProjectItem projectItem) {
- //如果我要更新UI,要切会主线程
- Log.d(TAG, "antiShakeAction: " + projectItem.toString());
- }
- });
- }
-
-
- @Override
- protected void onDestroy() {
- super.onDestroy();
- if (disposable != null && !disposable.isDisposed()) {
- disposable.dispose();
- }
- }
请求服务器注册(耗时操作)
↓
更新注册UI(main线程)
↓
请求服务器登录(耗时操作)
↓
更新注册UI(main线程)
玩Android 开放API-玩Android - wanandroid.com
https://www.bejson.com/json2javapojo/new/%27
玩Android 开放API-玩Android - wanandroid.com
https://www.bejson.com/json2javapojo/new/%27
- RxJava 1.x
- @NonNull
- public static
Observable onAssembly(@NonNull Observable source ) { - //预留给2.x
- return source;
- }
-
-
- RxJava2.x RxJavaPlugins中
- @NonNull
- public static
Observable onAssembly(@NonNull Observable source ) { - //默认情况下 f==null
- Function super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
- public static void setOnObservableAssembly(@Nullable Function super Observable, ? extends Observable> onObservableAssembly) {
- if (lockdown) throw new IllegalStateException("Plugins can't be changed anymore");
-
-
- RxJavaPlugins.onObservableAssembly = onObservableAssembly;
- }
- public static void main(String[] args) {
- //Hook之前的监听 是static 全局的 Hook 很多操作符都会经过[RxJavaPlugins.onAssembly]
- RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
- @Override
- public Observable apply(Observable observable) throws Exception {
- System.out.println("apply: 整个项目全局监听到底有多少地方使用了 RxJava"+observable);
- return observable;//不破坏人家的功能
- }
- });
- testJust();
- }
- public static void testJust(){
- Observable.just(1, 2, 4).subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- System.out.println("integer==>" + integer);
- }
- });
- }
- //执行结果
- 整个项目全局监听 observable: io.reactivex.internal.operators.observable.ObservableFromArray@64a294a6
- integer==>1
- integer==>2
- integer==>4

- //起点被观察者
- Observable.just(PATH) //TODO 第二步 内部会分发 返回 Observable
- //TODO 第三步 卡片式拦截 将String 转换为 Bitmap
- .map(new Function
() { - @Override
- public Bitmap apply(String s) throws Exception {
- URL url = new URL(PATH);
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setConnectTimeout(5000);
- int responseCode = conn.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_OK) {
- InputStream inputStream = conn.getInputStream();
- return BitmapFactory.decodeStream(inputStream);
- }
- return null;
- }
- })
- //日志记录
- .map(new Function
() { - @Override
- public Bitmap apply(Bitmap bitmap) throws Exception {
- Log.d(TAG, "apply: " + getNormalTime(System.currentTimeMillis()) + "下载了图片");
- return bitmap;//将上个返回的数据继续流给下游;
- }
- })
- .map(new Function
() { - @Override
- public Bitmap apply(Bitmap bitmap) throws Exception {
- Paint paint = new Paint();
- paint.setTextSize(88);
- paint.setColor(Color.RED);
- return drawTextToBitmap(bitmap, "NorthStar", paint, 88, 88);
- }
- })
- //TODO 将上下游调度网络的操作放入compose中
- //.subscribeOn(Schedulers.io()) //给上面代码 分配异步线程
- //.observeOn(AndroidSchedulers.mainThread()) //给下面代码分配主线程
- .compose(rxUD())
- //TODO 订阅 起点 和 终点 订阅起来 上层改变 下层会响应改变
- .subscribe(
- //终点观察者
- new Observer
() { - @Override//TODO 第一步 订阅开始 预备开始要分发
- public void onSubscribe(Disposable d) {
- progressDialog = new ProgressDialog(DownloadActivity.this);
- progressDialog.setTitle("下载图片中...");
- progressDialog.show();
- }
-
-
- @Override// TODO 第四步 拿到事件
- public void onNext(Bitmap bitmap) {
- iv.setImageBitmap(bitmap);
- }
-
-
- @Override//错误事件
- public void onError(Throwable e) {
- Log.d(TAG, "onError: " + e.getMessage());
- }
-
-
- @Override//TODO 第五步 完成事件
- public void onComplete() {
- if (progressDialog != null) {
- progressDialog.dismiss();
- }
- }
- });
-
-
- /**
- * 封装我们线程调度的操作
- * UD : upstream 上游 , downstream 下游
- */
- public static
ObservableTransformer rxUD() { - return new ObservableTransformer
() { - @Override
- public ObservableSource
apply(Observable upstream) { - return upstream.subscribeOn(Schedulers.io()) //给上面代码分配异步线程
- .observeOn(AndroidSchedulers.mainThread()) //给下面代码分配主线程
- .map(new Function
() { - @Override
- public UD apply(UD ud) throws Exception {
- Log.d(TAG, "apply: 我监听到你了, 居然在执行");
- return ud;
- }
- });
- }
- };
- }
-
-
- //2.Observable 被观察者/发布者 起点 创建过程 : new ObservableCreate(){source=自定义source}
- //传入自定义source → creat(ObservableOnSubscribe
soure - //ObservableCreate
extends Observable - Observable.create(
- // 传入自定义source → creat(ObservableOnSubscribe
soure - new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
emitter) throws Exception { - emitter.onNext("A");
- }
- })
- // ObservableCreate.map
- // ObservableMap
extends AbstractObservableWithUpstream - // AbstractObservableWithUpstream
extends Observable implements HasUpstreamObservableSource - // ObservableSource
source(); - .map(new Function
() { //发送一次 - @Override
- public Bitmap apply(String s) throws Exception {
- return null;
- }
- })
- .flatMap(new Function
>() { //发送很多次 - @Override
- public ObservableSource
apply(Bitmap bitmap) throws Exception { - ObservableSource
bmp=new Observable() { - @Override
- protected void subscribeActual(Observer super Bitmap> observer) {
-
-
- }
- };
- return bmp;
- }
- })
- .doOnNext(new Consumer
() { - @Override
- public void accept(Bitmap o) throws Exception {
-
-
- }
- })
- .subscribeOn(Schedulers.io()) //给上面调度 异步线程
- .observeOn(AndroidSchedulers.mainThread()) //给下面调度 主线程
- //3.subscribe 订阅过程 ObservableMap.subscribe
- .subscribe(
- //1.自定义观察者 Observer 终点
- new Observer
() { //interface - @Override // 谁调subscribe 就在那个线程中,不参与调度
- public void onSubscribe(Disposable d) {
-
-
- }
-
-
- @Override
- public void onNext(Bitmap aBoolean) {
-
-
- }
-
-
- @Override
- public void onError(Throwable e) {
-
-
- }
-
-
- @Override
- public void onComplete() {
-
-
- }
- });
-
-
- }
-
-
自定义source ObservableCreate subscribe订阅 ← 自定义观察者
CreateEmitter.OnNext() subscribeActual(自定义观察者) ← subscribeActual new Observer
↖ ↘ ↓ 创建发射器
↖ ↘CreateEmitter(自定义观察者)---------------------------->onNext();
↖ ↓ 调用
[new ObservableOnSubscribe
callback ↖ ↓
↖source.subscribe(发射器)
Observable(被观察者) ObservableOnSubscribe RxJavaPlugins ObservableCreate
| | | |
|1.new ObservableOnSubscribe() | | |
|------------------------------------->| | |
|─┐2.Create | | |
|<┘ | | |
| 3.onAssembly(new ObservableCreate
|------------------------------------------------------->|4.new ObservableCreate()|
| | |----------------------->|
| 6.返回ObservableCreate对象 | 5.ObservableCreate |
|<-------------------------------------------------------|<-----------------------|
| | | |
Observable(被观察者) ObservableOnSubscribe RxJavaPlugins ObservableCreate

标准观察者模式 RxJava(耦合度低)
↙Observer1 被观察者(发布) 抽象层(发射器) map
Observable ← Observer2 ObservableOnSubscribe CreateEmitter map
List observers ↖Observer3 source.onNext()------->.onNext() ↓
add observer ... ↘ 自定义观察者(订阅)
remove observer Observer
notify observer .onNext()

一个Map源码分析:
代码区域 流程区域
Observable.create | | ObservableCreate ObservableMap Observable
↓ | |← subscribeActual(obse) ← subscribeActual(obse) ← .subscribe
ObservableCreate |ObservableOnSubscribe|
↓ | 自定义 source | ↓ ↓ ↓
ObservableMap | |
↓ | |--->CreateEmitter--------->MapObserver--------->自定义Observer
Observable.subscribe | |



- /**
- * Calls the associated hook function.
- * @param defaultScheduler the hook's input value
- * @return the value returned by the hook
- */
- @NonNull
- public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
- Function super Scheduler, ? extends Scheduler> f = onIoHandler;
- //默认是null
- if (f == null) {
- return defaultScheduler;
- }
- return apply(f, defaultScheduler);
- }
-
-
- 最终 Scheduler IO scheduler.io()
-
-
- /**
- * Sets the specific hook function.
- * @param handler the hook function to set, null allowed
- */
- public static void setIoSchedulerHandler(@Nullable Function super Scheduler, ? extends Scheduler> handler) {
- if (lockdown) {
- throw new IllegalStateException("Plugins can't be changed anymore");
- }
- onIoHandler = handler;
- }
-
-
-
-
-
-
- Schedulers有很多细节
- //TODO 1.Schedulers.io() 线程策略机制
- Schedulers----> Scheduler IO---->new IOTask()--->DEFAULT =new IoScheduler()-->线程池 ExecutorService
-
-
- .subscribeOn(
- //RxJavaPlugins.onIoScheduler(IO); HOOK技术 Calls the associated hook function.
- Schedulers.io() //策略机制
- )
- @NonNull
- public static Scheduler io() {
- return RxJavaPlugins.onIoScheduler(IO);
- }
- IO = RxJavaPlugins.initIoScheduler(new IOTask());
-
-
- static final class IOTask implements Callable
{//有返回值的任务 返回Scheduler - @Override
- public Scheduler call() throws Exception {
- return IoHolder.DEFAULT;
- }
- }
- static final Scheduler DEFAULT = new IoScheduler();
-
-
- public IoScheduler() { this(WORKER_THREAD_FACTORY); }
-
-
- public IoScheduler(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- this.pool = new AtomicReference
(NONE); //线程池 - start();
- }
-
-
- //TODO 2.subscribeOn() new IoScheduler---线程池 传入
- public final Observable
subscribeOn(Scheduler scheduler) { - ObjectHelper.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableSubscribeOn
(this, scheduler)); - }
-
-
- public ObservableSubscribeOn(ObservableSource
source, Scheduler scheduler) { - super(source);
- this.scheduler = scheduler;
- }
- //在终点 订阅时 会调用 subscribeActual()
- public final void subscribe(Observer super T> observer) {
- subscribeActual(observer);
- }
-
-
- //这个在ObservableSubscribeOn中
- @Override
- public void subscribeActual(final Observer super T> s) {
- final SubscribeOnObserver
parent = new SubscribeOnObserver(s); - s.onSubscribe(parent);
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
- }
-
-
- final class SubscribeTask implements Runnable {
- private final SubscribeOnObserver
parent; -
-
- SubscribeTask(SubscribeOnObserver
parent) { - this.parent = parent;
- }
-
-
- @Override
- public void run() {
- source.subscribe(parent);
- }
- }
-
-
- //ioScheduler(线程池).scheduleDirect(new SubscribeTask(parent))
- public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
- final Worker w = createWorker(); == IoScheduler.createWorker(){return new EventLoopWorker(pool.get());}
- final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//自己包装了 Runnable
- DisposeTask task = new DisposeTask(decoratedRun, w);//包装一层 Runnable
- w.schedule(task, delay, unit);//↓ EventLoopWorker.schedule
- return task;
- }
-
-
- public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
- return threadWorker.scheduleActual(action, delayTime, unit, tasks);//↓
- }
-
-
- @NonNull
- public ScheduledRunnable scheduleActual(final Runnable run, long delayTime,
- @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
- Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
- ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
- Future> f;
- try {
- if (delayTime <= 0) {
- f = executor.submit((Callable
- } else {
- f = executor.schedule((Callable
- }
- sr.setFuture(f);
- }
- return sr;
- }
-
-
属于异步线程
自定义 ObservableCreate SubscribeTask ObservableSubscribeOn<--subscribe(订阅)<-- 终点
source subscribeActual(包裹1) subscribeActual(终点) subscribeActual Observer
↖ ↓ run{ ↓
包裹2.onNext source.subXX(包裹2) <--source.subXX() SubscribeOnObserver(Observer)
↘ ↓ } 终点存放
CreateEmitter ↓
包裹1存放 包裹2 线程池
onNext(){ executor.submit((Callable
包裹1.onNext() 这时从线程池中执行异步任务
}
异步线程
Scheduler MAIN_THREAD <---------------------------- ObserverOn(AndroidSchedulers.mainThread())
↓
DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));-> HandlerScheduler$.HandlerWork
scheduler{handler.sendMessage(run)}


