• 7. RxJava总结


    1.Rx思维 (起点到终点链条不断,响应式编程)

    起点(分发事件(Path): 我饿了)--下楼--去餐厅---点餐--->终点(吃饭消费事件)

    程序中的例子

    起点(分发事件,点击登录)---登录API---请求服务器---获取响应码---->终点(更新UI登录成功 消费事件)

    2.核心思想

    有一个起点 和 一个终点, 起点开始流向我们的"事件" 把事件流向到终点,只不过在流向的过程中,可以增加拦截,

    拦截时可以对"事件进行改变", 终点只关心它上一个拦截;

    ---------------------------------------------------------------

    Observable 根据上层变化而变化过滤 Observer

    被观察者 订阅(上层区域 和 下层区域 关联) 观察者

    起点 终点

    ---------------------------------------------------------------

    标准观察者: RxJava的观察者模式:

    3.RxJava配合Retrofit

    1> 数据模块

    1. // 总数据Bean
    2. public class ProjectBean {
    3. private int errorCode;
    4. private String errorMsg;
    5. private List data;
    6. public static class DataBean {
    7. private int courseId;
    8. private int id;
    9. private String name;
    10. private int order;
    11. private int parentChapterId;
    12. private boolean userControlSetTop;
    13. private int visible;
    14. private List> children;
    15. }
    16. }
    17. // Item数据
    18. public class ProjectItem {
    19. private DataBean data;
    20. private int errorCode;
    21. private String errorMsg;
    22. public static class DataBean {
    23. private int curPage;
    24. private int offset;
    25. private boolean over;
    26. private int pageCount;
    27. private int size;
    28. private int total;
    29. private List datas;
    30. public static class DatasBean {
    31. private String apkLink;
    32. private String desc;
    33. private String envelopePic;
    34. private boolean fresh;
    35. private int id;
    36. private String link;
    37. private String niceDate;
    38. private String origin;
    39. private String prefix;
    40. private String projectLink;
    41. private List tags;
    42. public static class TagsBean {
    43. private String name; // 项目
    44. private String url; ///project/list/1?cid=294
    45. }
    46. }
    47. }
    48. }

    2> Api模块

    1. public interface WanAndroidApi {
    2. //总数据
    3. @GET("project/tree/json")
    4. Observable getProject(); //异步线程耗时操作
    5. //Item数据
    6. @GET("project/list/{pageIndex}/json") //?cid=294 异步线程 耗时操作
    7. Observable getProjectItem(@Path("pageIndex") int pageIndex, @Query("cid") int cid);
    8. }

    3> 业务操

    
    
    
    1. public class UseActivity extends AppCompatActivity {
    2. private WanAndroidApi api;
    3. public static final String TAG = "rxJava";
    4. @Override
    5. protected void onCreate(Bundle savedInstanceState) {
    6. super.onCreate(savedInstanceState);
    7. setContentView(R.layout.activity_use);
    8. api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
    9. antiShakeAction();
    10. }
    11. //TODO Retrofit+RxJava 查询 项目分类 (总数据查询)
    12. Disposable disposable;
    13. public void getProjectAction(View view) {
    14. disposable = api.getProject()
    15. .subscribeOn(Schedulers.io()) //给上面分配异步线程
    16. .observeOn(AndroidSchedulers.mainThread())//给下面主线程
    17. .subscribe(new Consumer<ProjectBean>() {//简化版
    18. @Override
    19. public void accept(ProjectBean projectBean) throws Exception {
    20. Log.d(TAG, "accept: " + projectBean.toString());//可以做UI操作
    21. }
    22. });
    23. }
    24. //TODO Retrofit+RxJava 查询 项目分类的 去获取项目列表数据
    25. public void getProjectListAction(View view) {
    26. disposable = api.getProjectItem(1, 294)
    27. .compose(rxUD())
    28. .subscribe(projectItem -> {
    29. Log.d(TAG, "accept: " + projectItem.toString());//可以做UI操作
    30. });
    31. }
    32. /**
    33. * 封装我们线程调度的操作
    34. * UD : upstream 上游 , downstream 下游
    35. */
    36. public static <UD> ObservableTransformer<UD, UD> rxUD() {
    37. return upstream -> {
    38. return upstream.subscribeOn(Schedulers.io()) //给上面代码(subscribeOn)分配异步线程
    39. .observeOn(AndroidSchedulers.mainThread()); //给下面代码(observeOn)分配主线程
    40. };
    41. }
    42. @Override
    43. protected void onDestroy() {
    44. super.onDestroy();
    45. if (disposable != null && !disposable.isDisposed()) {
    46. disposable.dispose();
    47. }
    48. }

    4.防抖 rxBinding 与 网络嵌套

    1. public class UseActivity extends AppCompatActivity {
    2. private WanAndroidApi api;
    3. public static final String TAG = "rxJava";
    4. @Override
    5. protected void onCreate(Bundle savedInstanceState) {
    6. super.onCreate(savedInstanceState);
    7. setContentView(R.layout.activity_use);
    8. api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);
    9. antiShakeAction();
    10. }
    11. //TODO Retrofit+RxJava 查询 项目分类 (总数据查询)
    12. Disposable disposable;
    13. /**
    14. * TODO 自定义 功能防抖
    15. */
    16. private void antiShakeAction() {
    17. //对Button防抖动
    18. Button btn = findViewById(R.id.bt_anti_shake);
    19. disposable = RxView.clicks(btn)
    20. .throttleFirst(2000, TimeUnit.MILLISECONDS) //两秒内 响应一次
    21. .subscribe(new Consumer<Object>() {
    22. @Override
    23. public void accept(Object o) throws Exception {
    24. Log.d(TAG, "accept: 我响应了一次");
    25. }
    26. });
    27. }
    28. //TODO 防抖功能 + 网络嵌套 (解决网络嵌套问题) flatMap
    29. private void antiShakeActionUpdate() {
    30. //对Button防抖动
    31. Button btn = findViewById(R.id.bt_anti_shake);
    32. disposable = RxView.clicks(btn)//使用RxView来进行防抖
    33. .throttleFirst(2000, TimeUnit.MILLISECONDS) //两秒内 响应一次
    34. //我只给下面切换 异步线程
    35. .observeOn(Schedulers.io())
    36. //使用flatMap 自己分发 10个数据 onNext(1)-> 给下面 1-->多分发 10个数据
    37. //将Object 转换成 ObservableSource 传递给下一层
    38. .flatMap(new Function<Object, ObservableSource<ProjectBean>>() {
    39. @Override
    40. public ObservableSource<ProjectBean> apply(Object o) throws Exception {
    41. return api.getProject(); //返回 Observable
    42. }
    43. })
    44. //将 ProjectBean 转换为 ObservableSource 传递给下一层
    45. .flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {
    46. @Override
    47. public ObservableSource<ProjectBean.DataBean> apply(@NonNull ProjectBean projectBean) {
    48. return Observable.fromIterable(projectBean.getData());//自己搞一个发射器 发多次;;
    49. }
    50. })
    51. // 将ProjectBean.DataBean 转换成 ObservableSource 传递给下一层
    52. .flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() {
    53. @Override
    54. public ObservableSource<ProjectItem> apply(@NonNull ProjectBean.DataBean dataBean) {
    55. return api.getProjectItem(1, dataBean.getId()); //获取Item数据
    56. }
    57. })
    58. .observeOn(AndroidSchedulers.mainThread()) //给下面使用的线程
    59. .subscribe(new Consumer<ProjectItem>() {
    60. @Override
    61. public void accept(ProjectItem projectItem) {
    62. //如果我要更新UI,要切会主线程
    63. Log.d(TAG, "antiShakeAction: " + projectItem.toString());
    64. }
    65. });
    66. }
    67. @Override
    68. protected void onDestroy() {
    69. super.onDestroy();
    70. if (disposable != null && !disposable.isDisposed()) {
    71. disposable.dispose();
    72. }
    73. }

    5.doOnNext(频繁的线程切换)

    请求服务器注册(耗时操作)

    更新注册UI(main线程)

    请求服务器登录(耗时操作)

    更新注册UI(main线程)

    GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

    ReactiveX

    玩Android 开放API-玩Android - wanandroid.com

    https://www.bejson.com/json2javapojo/new/%27

    玩Android 开放API-玩Android - wanandroid.com

    https://www.bejson.com/json2javapojo/new/%27

    1> RxJava Hook

    1. RxJava 1.x
    2. @NonNull
    3. public static Observable onAssembly(@NonNull Observable source) {
    4. //预留给2.x
    5. return source;
    6. }
    7. RxJava2.x RxJavaPlugins
    8. @NonNull
    9. public static Observable onAssembly(@NonNull Observable source) {
    10. //默认情况下 f==null
    11. Functionsuper Observable, ? extends Observable> f = onObservableAssembly;
    12. if (f != null) {
    13. return apply(f, source);
    14. }
    15. return source;
    16. }
    17. public static void setOnObservableAssembly(@Nullable Functionsuper Observable, ? extends Observable> onObservableAssembly) {
    18. if (lockdown) throw new IllegalStateException("Plugins can't be changed anymore");
    19. RxJavaPlugins.onObservableAssembly = onObservableAssembly;
    20. }
    21. public static void main(String[] args) {
    22. //Hook之前的监听 是static 全局的 Hook 很多操作符都会经过[RxJavaPlugins.onAssembly]
    23. RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
    24. @Override
    25. public Observable apply(Observable observable) throws Exception {
    26. System.out.println("apply: 整个项目全局监听到底有多少地方使用了 RxJava"+observable);
    27. return observable;//不破坏人家的功能
    28. }
    29. });
    30. testJust();
    31. }
    32. public static void testJust(){
    33. Observable.just(1, 2, 4).subscribe(new Consumer<Integer>() {
    34. @Override
    35. public void accept(Integer integer) throws Exception {
    36. System.out.println("integer==>" + integer);
    37. }
    38. });
    39. }
    40. //执行结果
    41. 整个项目全局监听 observable: io.reactivex.internal.operators.observable.ObservableFromArray@64a294a6
    42. integer==>1
    43. integer==>2
    44. integer==>4

    2> RxJava的观察者模式

    1. //起点被观察者
    2. Observable.just(PATH) //TODO 第二步 内部会分发 返回 Observable
    3. //TODO 第三步 卡片式拦截 将String 转换为 Bitmap
    4. .map(new Function() {
    5. @Override
    6. public Bitmap apply(String s) throws Exception {
    7. URL url = new URL(PATH);
    8. HttpURLConnection conn = (HttpURLConnection) url.openConnection();
    9. conn.setConnectTimeout(5000);
    10. int responseCode = conn.getResponseCode();
    11. if (responseCode == HttpURLConnection.HTTP_OK) {
    12. InputStream inputStream = conn.getInputStream();
    13. return BitmapFactory.decodeStream(inputStream);
    14. }
    15. return null;
    16. }
    17. })
    18. //日志记录
    19. .map(new Function() {
    20. @Override
    21. public Bitmap apply(Bitmap bitmap) throws Exception {
    22. Log.d(TAG, "apply: " + getNormalTime(System.currentTimeMillis()) + "下载了图片");
    23. return bitmap;//将上个返回的数据继续流给下游;
    24. }
    25. })
    26. .map(new Function() {
    27. @Override
    28. public Bitmap apply(Bitmap bitmap) throws Exception {
    29. Paint paint = new Paint();
    30. paint.setTextSize(88);
    31. paint.setColor(Color.RED);
    32. return drawTextToBitmap(bitmap, "NorthStar", paint, 88, 88);
    33. }
    34. })
    35. //TODO 将上下游调度网络的操作放入compose中
    36. //.subscribeOn(Schedulers.io()) //给上面代码 分配异步线程
    37. //.observeOn(AndroidSchedulers.mainThread()) //给下面代码分配主线程
    38. .compose(rxUD())
    39. //TODO 订阅 起点 和 终点 订阅起来 上层改变 下层会响应改变
    40. .subscribe(
    41. //终点观察者
    42. new Observer() {
    43. @Override//TODO 第一步 订阅开始 预备开始要分发
    44. public void onSubscribe(Disposable d) {
    45. progressDialog = new ProgressDialog(DownloadActivity.this);
    46. progressDialog.setTitle("下载图片中...");
    47. progressDialog.show();
    48. }
    49. @Override// TODO 第四步 拿到事件
    50. public void onNext(Bitmap bitmap) {
    51. iv.setImageBitmap(bitmap);
    52. }
    53. @Override//错误事件
    54. public void onError(Throwable e) {
    55. Log.d(TAG, "onError: " + e.getMessage());
    56. }
    57. @Override//TODO 第五步 完成事件
    58. public void onComplete() {
    59. if (progressDialog != null) {
    60. progressDialog.dismiss();
    61. }
    62. }
    63. });
    64. /**
    65. * 封装我们线程调度的操作
    66. * UD : upstream 上游 , downstream 下游
    67. */
    68. public static ObservableTransformer rxUD() {
    69. return new ObservableTransformer() {
    70. @Override
    71. public ObservableSource apply(Observable upstream) {
    72. return upstream.subscribeOn(Schedulers.io()) //给上面代码分配异步线程
    73. .observeOn(AndroidSchedulers.mainThread()) //给下面代码分配主线程
    74. .map(new Function() {
    75. @Override
    76. public UD apply(UD ud) throws Exception {
    77. Log.d(TAG, "apply: 我监听到你了, 居然在执行");
    78. return ud;
    79. }
    80. });
    81. }
    82. };
    83. }
    84. //2.Observable 被观察者/发布者 起点 创建过程 : new ObservableCreate(){source=自定义source}
    85. //传入自定义source → creat(ObservableOnSubscribe soure
    86. //ObservableCreate extends Observable
    87. Observable.create(
    88. // 传入自定义source → creat(ObservableOnSubscribe soure
    89. new ObservableOnSubscribe() {
    90. @Override
    91. public void subscribe(ObservableEmitter emitter) throws Exception {
    92. emitter.onNext("A");
    93. }
    94. })
    95. // ObservableCreate.map
    96. // ObservableMap extends AbstractObservableWithUpstream
    97. // AbstractObservableWithUpstream extends Observable implements HasUpstreamObservableSource
    98. // ObservableSource source();
    99. .map(new Function() { //发送一次
    100. @Override
    101. public Bitmap apply(String s) throws Exception {
    102. return null;
    103. }
    104. })
    105. .flatMap(new Function>() { //发送很多次
    106. @Override
    107. public ObservableSource apply(Bitmap bitmap) throws Exception {
    108. ObservableSource bmp=new Observable() {
    109. @Override
    110. protected void subscribeActual(Observersuper Bitmap> observer) {
    111. }
    112. };
    113. return bmp;
    114. }
    115. })
    116. .doOnNext(new Consumer() {
    117. @Override
    118. public void accept(Bitmap o) throws Exception {
    119. }
    120. })
    121. .subscribeOn(Schedulers.io()) //给上面调度 异步线程
    122. .observeOn(AndroidSchedulers.mainThread()) //给下面调度 主线程
    123. //3.subscribe 订阅过程 ObservableMap.subscribe
    124. .subscribe(
    125. //1.自定义观察者 Observer 终点
    126. new Observer() { //interface
    127. @Override // 谁调subscribe 就在那个线程中,不参与调度
    128. public void onSubscribe(Disposable d) {
    129. }
    130. @Override
    131. public void onNext(Bitmap aBoolean) {
    132. }
    133. @Override
    134. public void onError(Throwable e) {
    135. }
    136. @Override
    137. public void onComplete() {
    138. }
    139. });
    140. }

    自定义source ObservableCreate subscribe订阅 ← 自定义观察者

    CreateEmitter.OnNext() subscribeActual(自定义观察者) ← subscribeActual new Observer()

    ↖ ↘ ↓ 创建发射器

    ↖ ↘CreateEmitter(自定义观察者)---------------------------->onNext();

    ↓ 调用

    [new ObservableOnSubscribe{}] observer.onSubscribe()

    callback ↖ ↓

    ↖source.subscribe(发射器)

    3> Observable 创建过程时序图如下

    Observable(被观察者) ObservableOnSubscribe RxJavaPlugins ObservableCreate

    | | | |

    |1.new ObservableOnSubscribe() | | |

    |------------------------------------->| | |

    |─┐2.Create | | |

    |<┘ | | |

    | 3.onAssembly(new ObservableCreate()) | |

    |------------------------------------------------------->|4.new ObservableCreate()|

    | | |----------------------->|

    | 6.返回ObservableCreate对象 | 5.ObservableCreate |

    |<-------------------------------------------------------|<-----------------------|

    | | | |

    Observable(被观察者) ObservableOnSubscribe RxJavaPlugins ObservableCreate

    标准观察者模式 RxJava(耦合度低)

    Observer1 被观察者(发布) 抽象层(发射器) map

    Observable ← Observer2 ObservableOnSubscribe CreateEmitter map

    List observers Observer3 source.onNext()------->.onNext() ↓

    add observer ... ↘ 自定义观察者(订阅)

    remove observer Observer

    notify observer .onNext()

    4>ObservableMap源码分析

    一个Map源码分析:

    代码区域 流程区域

    Observable.create | | ObservableCreate ObservableMap Observable

    ↓ | |← subscribeActual(obse) subscribeActual(obse) .subscribe

    ObservableCreate |ObservableOnSubscribe|

    ↓ | 自定义 source | ↓ ↓ ↓

    ObservableMap | |

    ↓ | |--->CreateEmitter--------->MapObserver--------->自定义Observer

    Observable.subscribe | |

    5>RxJava线程切换

    1. /**
    2. * Calls the associated hook function.
    3. * @param defaultScheduler the hook's input value
    4. * @return the value returned by the hook
    5. */
    6. @NonNull
    7. public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    8. Functionsuper Scheduler, ? extends Scheduler> f = onIoHandler;
    9. //默认是null
    10. if (f == null) {
    11. return defaultScheduler;
    12. }
    13. return apply(f, defaultScheduler);
    14. }
    15. 最终 Scheduler IO scheduler.io()
    16. /**
    17. * Sets the specific hook function.
    18. * @param handler the hook function to set, null allowed
    19. */
    20. public static void setIoSchedulerHandler(@Nullable Functionsuper Scheduler, ? extends Scheduler> handler) {
    21. if (lockdown) {
    22. throw new IllegalStateException("Plugins can't be changed anymore");
    23. }
    24. onIoHandler = handler;
    25. }
    26. Schedulers有很多细节
    27. //TODO 1.Schedulers.io() 线程策略机制
    28. Schedulers----> Scheduler IO---->new IOTask()--->DEFAULT =new IoScheduler()-->线程池 ExecutorService
    29. .subscribeOn(
    30. //RxJavaPlugins.onIoScheduler(IO); HOOK技术 Calls the associated hook function.
    31. Schedulers.io() //策略机制
    32. )
    33. @NonNull
    34. public static Scheduler io() {
    35. return RxJavaPlugins.onIoScheduler(IO);
    36. }
    37. IO = RxJavaPlugins.initIoScheduler(new IOTask());
    38. static final class IOTask implements Callable {//有返回值的任务 返回Scheduler
    39. @Override
    40. public Scheduler call() throws Exception {
    41. return IoHolder.DEFAULT;
    42. }
    43. }
    44. static final Scheduler DEFAULT = new IoScheduler();
    45. public IoScheduler() { this(WORKER_THREAD_FACTORY); }
    46. public IoScheduler(ThreadFactory threadFactory) {
    47. this.threadFactory = threadFactory;
    48. this.pool = new AtomicReference(NONE); //线程池
    49. start();
    50. }
    51. //TODO 2.subscribeOn() new IoScheduler---线程池 传入
    52. public final Observable subscribeOn(Scheduler scheduler) {
    53. ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    54. return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
    55. }
    56. public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
    57. super(source);
    58. this.scheduler = scheduler;
    59. }
    60. //在终点 订阅时 会调用 subscribeActual()
    61. public final void subscribe(Observersuper T> observer) {
    62. subscribeActual(observer);
    63. }
    64. //这个在ObservableSubscribeOn中
    65. @Override
    66. public void subscribeActual(final Observersuper T> s) {
    67. final SubscribeOnObserver parent = new SubscribeOnObserver(s);
    68. s.onSubscribe(parent);
    69. parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    70. }
    71. final class SubscribeTask implements Runnable {
    72. private final SubscribeOnObserver parent;
    73. SubscribeTask(SubscribeOnObserver parent) {
    74. this.parent = parent;
    75. }
    76. @Override
    77. public void run() {
    78. source.subscribe(parent);
    79. }
    80. }
    81. //ioScheduler(线程池).scheduleDirect(new SubscribeTask(parent))
    82. public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    83. final Worker w = createWorker(); == IoScheduler.createWorker(){return new EventLoopWorker(pool.get());}
    84. final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//自己包装了 Runnable
    85. DisposeTask task = new DisposeTask(decoratedRun, w);//包装一层 Runnable
    86. w.schedule(task, delay, unit);//↓ EventLoopWorker.schedule
    87. return task;
    88. }
    89. public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    90. return threadWorker.scheduleActual(action, delayTime, unit, tasks);//↓
    91. }
    92. @NonNull
    93. public ScheduledRunnable scheduleActual(final Runnable run, long delayTime,
    94. @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    95. Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    96. ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    97. Future f;
    98. try {
    99. if (delayTime <= 0) {
    100. f = executor.submit((Callable)sr);//将任务最终交给线程池执行
    101. } else {
    102. f = executor.schedule((Callable)sr, delayTime, unit);
    103. }
    104. sr.setFuture(f);
    105. }
    106. return sr;
    107. }
    108. 6>SubscribeOn(Schedulers.io()) 时序图

      属于异步线程

      自定义 ObservableCreate SubscribeTask ObservableSubscribeOn<--subscribe(订阅)<-- 终点

      source subscribeActual(包裹1) subscribeActual(终点) subscribeActual Observer

      ↖ ↓ run{ ↓

      包裹2.onNext source.subXX(包裹2) <--source.subXX() SubscribeOnObserver(Observer)

      ↘ ↓ } 终点存放

      CreateEmitter ↓

      包裹1存放 包裹2 线程池

      onNext(){ executor.submit((Callable)sr)

      包裹1.onNext() 这时从线程池中执行异步任务

      }

      异步线程

      Scheduler MAIN_THREAD <---------------------------- ObserverOn(AndroidSchedulers.mainThread())

      DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));-> HandlerScheduler$.HandlerWork

      scheduler{handler.sendMessage(run)}

    109. 相关阅读:
      2022牛客暑期多校训练营7(BCFGJ)
      基于Kinect 动捕XR直播解决方案 - 硬件篇
      mysql innodb 存储引擎
      ubuntu20.04配置解压版mysql5.7
      企业如何做好供应链管理工作?8个步骤及应用详解!
      笔试训练2
      oak深度相机入门教程-人物跟踪
      java项目接口重复提交解决方案
      linux中 struct page 与物理地址的关系
      led护眼灯真的能护眼吗?Led护眼灯的好处
    110. 原文地址:https://blog.csdn.net/x910131/article/details/126071437