• 谈谈WorkManager线程池的设计


    这篇文章是基于WorkManager原理的一个拓展,如果不熟悉WorkManager原理的可以先看一下下面这篇文章:

    WorkManager 一篇文章就够了_AD钙奶-lalala的博客-CSDN博客activity里面先构建data,然后通过setInputData传入data,最后woker里面从params里面接收data。既然activity可以向worker里面发送data,那么woker里面该如何回传data呢?我们工程中一定会有大量activity和WorkManager传递参数的场景,那么该如何传递参数呢?很明显,WorkManager里面又封装了LiveData,是通过LiveData监听数据返回的。注意:这个doWork方法是异步的,后面分析源码时会讲到。......https://blog.csdn.net/qq_36428821/article/details/126317993?spm=1001.2014.3001.5501

    我不知道大家在看WorkManager源码的时候有没有这样一个疑惑,就是一个线程池里面执行一个Runnable,然后这个Runnable里面好像又有一个线程池执行Runnable,这样不就是线程里面套线程了吗?很显然,Google是不可能犯这样低级的错误的,那么我们就一起来看下这个线程池倒计时怎么设计的吧!

    我们先来看一下下面的代码,不理解的翻一下上一篇文章:

    1. @Override
    2. public @NonNull Operation enqueue() {
    3. // Only enqueue if not already enqueued.
    4. if (!mEnqueued) {
    5. EnqueueRunnable runnable = new EnqueueRunnable(this);
    6. mWorkManagerImpl.getWorkTaskExecutor()
    7. .executeOnBackgroundThread(runnable);//1
    8. mOperation = runnable.getOperation();
    9. } else {
    10. Logger.get().warning(TAG,
    11. String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));
    12. }
    13. return mOperation;
    14. }

    这是WorkContinuationImpl里面的enqueue方法,我们看源码知道EnqueueRunnable继承了Runnable接口,注释1处的意思是将这个Runnable放到线程池里面去执行。但是,真的是这样吗?

    EnqueueRunnable里run方法又会调用:

    1. @VisibleForTesting
    2. public void scheduleWorkInBackground() {
    3. WorkManagerImpl workManager = mWorkContinuation.getWorkManagerImpl();
    4. Schedulers.schedule(
    5. workManager.getConfiguration(),
    6. workManager.getWorkDatabase(),
    7. workManager.getSchedulers());
    8. }

    最后会走到GreedyScheduler里面:

    mWorkManagerImpl.startWork(workSpec.id);

    回到WorkManagerImpl:

    1. @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    2. public void startWork(@NonNull String workSpecId) {
    3. startWork(workSpecId, null);
    4. }
    5. @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    6. public void startWork(
    7. @NonNull String workSpecId,
    8. @Nullable WorkerParameters.RuntimeExtras runtimeExtras) {
    9. mWorkTaskExecutor
    10. .executeOnBackgroundThread(
    11. new StartWorkRunnable(this, workSpecId, runtimeExtras));
    12. }

    问题来了,到目前为止一路走下来怎么出现了两次executeOnBackgroudThread?

    我们先来看看这个mWorkTaskExecutor究竟是个什么东西?

    注:mWorkManagerImpl.getWorkTaskExecutor()返回的也是mWorkManagerImpl。

    1. private void internalInit(@NonNull Context context,
    2. @NonNull Configuration configuration,
    3. @NonNull TaskExecutor workTaskExecutor,
    4. @NonNull WorkDatabase workDatabase,
    5. @NonNull List schedulers,
    6. @NonNull Processor processor) {
    7. ···
    8. mWorkTaskExecutor = workTaskExecutor;
    9. ···
    10. }

    在WorkManagerImpl里面找到初始化的地方了,接着看调用这个方法是在什么地方。

    1. @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
    2. public WorkManagerImpl(
    3. @NonNull Context context,
    4. @NonNull Configuration configuration,
    5. @NonNull TaskExecutor workTaskExecutor,
    6. @NonNull WorkDatabase database) {
    7. Context applicationContext = context.getApplicationContext();
    8. Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));
    9. List schedulers =
    10. createSchedulers(applicationContext, configuration, workTaskExecutor);
    11. Processor processor = new Processor(
    12. context,
    13. configuration,
    14. workTaskExecutor,
    15. database,
    16. schedulers);
    17. internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);
    18. }

    接着往上找:

    1. public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {
    2. synchronized (sLock) {
    3. if (sDelegatedInstance != null && sDefaultInstance != null) {
    4. throw new IllegalStateException("WorkManager is already initialized. Did you "
    5. + "try to initialize it manually without disabling "
    6. + "WorkManagerInitializer? See "
    7. + "WorkManager#initialize(Context, Configuration) or the class level "
    8. + "Javadoc for more information.");
    9. }
    10. if (sDelegatedInstance == null) {
    11. context = context.getApplicationContext();
    12. if (sDefaultInstance == null) {
    13. sDefaultInstance = new WorkManagerImpl(
    14. context,
    15. configuration,
    16. new WorkManagerTaskExecutor(
    17. configuration.getTaskExecutor()));//1
    18. }
    19. sDelegatedInstance = sDefaultInstance;
    20. }
    21. }
    22. }

    我们找到了WorkManagerImpl初始化的时候,注意注释1处,原来这是一个WorkManagerTaskExecutor!前面嵌套调用的方法:

    1. @Override
    2. public void executeOnBackgroundThread(Runnable runnable) {
    3. mBackgroundExecutor.execute(runnable);
    4. }

    里面又调用了一层,接着看:

    1. private final SerialExecutor mBackgroundExecutor;
    2. public WorkManagerTaskExecutor(@NonNull Executor backgroundExecutor) {
    3. // Wrap it with a serial executor so we have ordering guarantees on commands
    4. // being executed.
    5. mBackgroundExecutor = new SerialExecutor(backgroundExecutor);
    6. }

    原来调用的是SerialExcutor的execute方法!在回到注释1处的WorkManagerTaskExcutor的初始化:

    new WorkManagerTaskExecutor(configuration.getTaskExecutor())

    可见:SerialExecutor里面传入的是configuration.getTaskExecutor()。

    接着看:

    1. @NonNull
    2. public Executor getTaskExecutor() {
    3. return mTaskExecutor;
    4. }
    5. mTaskExecutor = createDefaultExecutor(true /* isTaskExecutor */);
    6. private @NonNull Executor createDefaultExecutor(boolean isTaskExecutor) {
    7. return Executors.newFixedThreadPool(
    8. // This value is the same as the core pool size for AsyncTask#THREAD_POOL_EXECUTOR.
    9. Math.max(2,
    10. Math.min(Runtime.getRuntime().availableProcessors() - 1,
    11. 4)),
    12. createDefaultThreadFactory(isTaskExecutor));
    13. }

    可见,线程池原来在这里创建的!

    1. public class SerialExecutor implements Executor {
    2. private final ArrayDeque mTasks;
    3. private final Executor mExecutor;
    4. private final Object mLock;
    5. private volatile Runnable mActive;
    6. public SerialExecutor(@NonNull Executor executor) {
    7. mExecutor = executor;
    8. mTasks = new ArrayDeque<>();
    9. mLock = new Object();
    10. }
    11. @Override
    12. public void execute(@NonNull Runnable command) {
    13. synchronized (mLock) {
    14. mTasks.add(new Task(this, command));
    15. if (mActive == null) {
    16. scheduleNext();
    17. }
    18. }
    19. }
    20. // Synthetic access
    21. void scheduleNext() {
    22. synchronized (mLock) {
    23. if ((mActive = mTasks.poll()) != null) {
    24. mExecutor.execute(mActive);
    25. }
    26. }
    27. }
    28. /**
    29. * @return {@code true} if there are tasks to execute in the queue.
    30. */
    31. public boolean hasPendingTasks() {
    32. synchronized (mLock) {
    33. return !mTasks.isEmpty();
    34. }
    35. }
    36. @NonNull
    37. @VisibleForTesting
    38. public Executor getDelegatedExecutor() {
    39. return mExecutor;
    40. }
    41. /**
    42. * A {@link Runnable} which tells the {@link SerialExecutor} to schedule the next command
    43. * after completion.
    44. */
    45. static class Task implements Runnable {
    46. final SerialExecutor mSerialExecutor;
    47. final Runnable mRunnable;
    48. Task(@NonNull SerialExecutor serialExecutor, @NonNull Runnable runnable) {
    49. mSerialExecutor = serialExecutor;
    50. mRunnable = runnable;
    51. }
    52. @Override
    53. public void run() {
    54. try {
    55. mRunnable.run();
    56. } finally {
    57. mSerialExecutor.scheduleNext();
    58. }
    59. }
    60. }
    61. }

    阶段性总结一下:mWorkTaskExecutor.executeOnBackgroundThread中的mWorkTaskExecutor是WorkManagerTaskExecutor,executeOnBackgroundThread调用链条:

    executeOnBackgroundThread()->SerialExecutor的execute(),我们来重点看下这个方法:

    1. @Override
    2. public void execute(@NonNull Runnable command) {
    3. synchronized (mLock) {
    4. mTasks.add(new Task(this, command));
    5. if (mActive == null) {
    6. scheduleNext();
    7. }
    8. }
    9. }

    仅仅是把任务加入了Task,然后调用Runnable自身的run方法,跟线程池没有半毛钱关系:

    1. static class Task implements Runnable {
    2. final SerialExecutor mSerialExecutor;
    3. final Runnable mRunnable;
    4. Task(@NonNull SerialExecutor serialExecutor, @NonNull Runnable runnable) {
    5. mSerialExecutor = serialExecutor;
    6. mRunnable = runnable;
    7. }
    8. @Override
    9. public void run() {
    10. try {
    11. mRunnable.run();
    12. } finally {
    13. mSerialExecutor.scheduleNext();
    14. }
    15. }
    16. }

    所有嵌套任务都加入Task后,调用 scheduleNext():

    1. // Synthetic access
    2. void scheduleNext() {
    3. synchronized (mLock) {
    4. if ((mActive = mTasks.poll()) != null) {
    5. mExecutor.execute(mActive);
    6. }
    7. }
    8. }

    这个才是最终线程池执行的任务!也就是最里面的一层。这下大家清楚了吧,这个设计确实非常巧妙!

  • 相关阅读:
    gpf_maskf分配掩码描述
    头歌平台 | 逻辑函数及其描述工具logisim使用
    基于SSM的社会救助信息管理毕业设计源码211633
    【脑与认知科学】【n-back游戏】
    Dubbo各种协议
    Vue使用CryptoJS实现前后端密码加密
    OpenCV入门(C++/Python)- 使用OpenCV读取和编写视频(二)
    第一百五十七回 SliverList组件
    旅游景区开发小程序有什么好处
    Mybatis Plus配置多个数据源
  • 原文地址:https://blog.csdn.net/qq_36428821/article/details/126332323