• 万字整理 | 深入理解工作队列


    伟林,中年码农,从事过电信、手机、安全、芯片等行业,目前依旧从事Linux方向开发工作,个人爱好Linux相关知识分享,个人微博CSDN pwl999,欢迎大家关注!

    • 1.1 worker_pool

      • 1.1.1 normal worker_pool

      • 1.1.2 unbound worker_pool

    • 1.2 worker

      • 1.2.1 worker处理work

      • 1.2.2 worker_pool动态管理worker

      • 1.2.3 cpu hotplug处理

    • 1.3 workqueue

      • 1.3.1 系统workqueue

      • 1.3.2 workqueue创建

      • 1.3.3 flush_workqueue()

    • 1.4 pool_workqueue

    • 1.5 work

      • 1.5.1 queue_work()

      • 1.5.2 flush_work()

    • 2.1 schedule_work()

    • 2.2 sschedule_work_on()

    • 2.3 schedule_delayed_work()

    workqueue是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务(work)都不会自己起一个线程来处理,而是扔到workqueu中处理。workqueue的主要工作就是用进程上下文来处理内核中大量的小任务。

    所以workqueue的主要设计思想:一个是并行,多个work不要相互阻塞;另外一个是节省资源,多个work尽量共享资源(进程、调度、内存),不要造成系统过多的资源浪费。

    为了实现的设计思想,workqueue的设计实现也更新了很多版本。最新的workqueue实现叫做CMWQ(Concurrency Managed Workqueue),也就是用更加智能的算法来实现“并行和节省”。新版本的workque创建函数改成alloc_workqueue(),旧版本的函数create_*workqueue()逐渐会被被废弃。

    本文的代码分析基于linux kernel 3.18.22,最好的学习方法还是"read the fucking source code"

    1.CMWQ的几个基本概念

    关于workqueue中几个概念都是work相关的数据结构非常容易混淆,大概可以这样来理解:

    • work :工作。

    • workqueue :工作的集合。workqueue和work是一对多的关系。

    • worker :工人。在代码中worker对应一个work_thread()内核线程。

    • worker_pool:工人的集合。worker_pool和worker是一对多的关系。

    • pwq(pool_workqueue):中间人/中介,负责建立起workqueue和worker_pool之间的关系。workqueue和pwq是一对多的关系,pwq和worker_pool是一对一的关系。

    be21b93db66dab346566f0783fa72b6e.png

    最终的目的还是把work(工作)传递给worker(工人)去执行,中间的数据结构和各种关系目的是把这件事组织的更加清晰高效。

    1.1 worker_pool

    每个执行work的线程叫做worker,一组worker的集合叫做worker_pool。CMWQ的精髓就在worker_pool里面worker的动态增减管理上manage_workers()。

    CMWQ对worker_pool分成两类:

    • normal worker_pool,给通用的workqueue使用;

    • unbound worker_pool,给WQ_UNBOUND类型的的workqueue使用;

    1.1.1 normal worker_pool

    默认work是在normal worker_pool中处理的。系统的规划是每个cpu创建两个normal worker_pool:一个normal优先级(nice=0)、一个高优先级(nice=HIGHPRI_NICE_LEVEL),对应创建出来的worker的进程nice不一样。

    每个worker对应一个worker_thread()内核线程,一个worker_pool包含一个或者多个worker,worker_pool中worker的数量是根据worker_pool中work的负载来动态增减的。

    我们可以通过“ps|grep kworker”命令来查看所有worker对应的内核线程,normal worker_pool对应内核线程(worker_thread())的命名规则是这样的:

    1. snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    2.    pool->attrs->nice < 0  ? "H" : "");
    3.  worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
    4.            "kworker/%s", id_buf);

    so类似名字是normal worker_pool:

    1. shell@PRO5:/ $ ps | grep "kworker"
    2. root      14    2     0      0     worker_thr 0000000000 S kworker/1:0H  // cpu1 高优先级worker_pool的第0个worker进程
    3. root      17    2     0      0     worker_thr 0000000000 S kworker/2:0  // cpu2 低优先级worker_pool的第0个worker进程
    4. root      18    2     0      0     worker_thr 0000000000 S kworker/2:0H  // cpu2 高优先级worker_pool的第0个worker进程
    5. root      23699 2     0      0     worker_thr 0000000000 S kworker/0:1  // cpu0 低优先级worker_pool的第1个worker进程
    85f50da19fa5f3efc3aecef1bde62804.png

    对应的拓扑图如下:

    b456b607050676cf7e7df208d47a927e.png

    以下是normal worker_pool详细的创建过程代码分析:

    • kernel/workqueue.c:

    • init_workqueues() -> init_worker_pool()/create_worker()

    1. static int __init init_workqueues(void)
    2. {
    3.  int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
    4.  int i, cpu;
    5.  // (1)给每个cpu创建对应的worker_pool
    6.  /* initialize CPU pools */
    7.  for_each_possible_cpu(cpu) {
    8.   struct worker_pool *pool;
    9.   i = 0;
    10.   for_each_cpu_worker_pool(pool, cpu) {
    11.    BUG_ON(init_worker_pool(pool));
    12.    // 指定cpu
    13.    pool->cpu = cpu;
    14.    cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
    15.    // 指定进程优先级nice
    16.    pool->attrs->nice = std_nice[i++];
    17.    pool->node = cpu_to_node(cpu);
    18.    /* alloc pool ID */
    19.    mutex_lock(&wq_pool_mutex);
    20.    BUG_ON(worker_pool_assign_id(pool));
    21.    mutex_unlock(&wq_pool_mutex);
    22.   }
    23.  }
    24.  // (2)给每个worker_pool创建第一个worker
    25.  /* create the initial worker */
    26.  for_each_online_cpu(cpu) {
    27.   struct worker_pool *pool;
    28.   for_each_cpu_worker_pool(pool, cpu) {
    29.    pool->flags &= ~POOL_DISASSOCIATED;
    30.    BUG_ON(!create_worker(pool));
    31.   }
    32.  }
    33. }
    34. | →
    35. static int init_worker_pool(struct worker_pool *pool)
    36. {
    37.  spin_lock_init(&pool->lock);
    38.  pool->id = -1;
    39.  pool->cpu = -1;
    40.  pool->node = NUMA_NO_NODE;
    41.  pool->flags |= POOL_DISASSOCIATED;
    42.  // (1.1) worker_pool的work list,各个workqueue把work挂载到这个链表上,
    43.  // 让worker_pool对应的多个worker来执行
    44.  INIT_LIST_HEAD(&pool->worklist);
    45.  // (1.2) worker_pool的idle worker list,
    46.  // worker没有活干时,不会马上销毁,先进入idle状态备选
    47.  INIT_LIST_HEAD(&pool->idle_list);
    48.  // (1.3) worker_pool的busy worker list,
    49.  // worker正在干活,在执行work
    50.  hash_init(pool->busy_hash);
    51.  // (1.4) 检查idle状态worker是否需要destroy的timer
    52.  init_timer_deferrable(&pool->idle_timer);
    53.  pool->idle_timer.function = idle_worker_timeout;
    54.  pool->idle_timer.data = (unsigned long)pool;
    55.  // (1.5) 在worker_pool创建新的worker时,检查是否超时的timer
    56.  setup_timer(&pool->mayday_timer, pool_mayday_timeout,
    57.       (unsigned long)pool);
    58.  mutex_init(&pool->manager_arb);
    59.  mutex_init(&pool->attach_mutex);
    60.  INIT_LIST_HEAD(&pool->workers);
    61.  ida_init(&pool->worker_ida);
    62.  INIT_HLIST_NODE(&pool->hash_node);
    63.  pool->refcnt = 1;
    64.  /* shouldn't fail above this point */
    65.  pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
    66.  if (!pool->attrs)
    67.   return -ENOMEM;
    68.  return 0;
    69. }
    70. | →
    71. static struct worker *create_worker(struct worker_pool *pool)
    72. {
    73.  struct worker *worker = NULL;
    74.  int id = -1;
    75.  char id_buf[16];
    76.  /* ID is needed to determine kthread name */
    77.  id = ida_simple_get(&pool->worker_ida, 00, GFP_KERNEL);
    78.  if (id < 0)
    79.   goto fail;
    80.  worker = alloc_worker(pool->node);
    81.  if (!worker)
    82.   goto fail;
    83.  worker->pool = pool;
    84.  worker->id = id;
    85.  if (pool->cpu >= 0)
    86.   // (2.1) 给normal worker_pool的worker构造进程名
    87.   snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    88.     pool->attrs->nice < 0  ? "H" : "");
    89.  else
    90.   // (2.2) 给unbound worker_pool的worker构造进程名
    91.   snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
    92.  // (2.3) 创建worker对应的内核进程
    93.  worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
    94.            "kworker/%s", id_buf);
    95.  if (IS_ERR(worker->task))
    96.   goto fail;
    97.  // (2.4) 设置内核进程对应的优先级nice
    98.  set_user_nice(worker->task, pool->attrs->nice);
    99.  /* prevent userland from meddling with cpumask of workqueue workers */
    100.  worker->task->flags |= PF_NO_SETAFFINITY;
    101.  // (2.5) 将worker和worker_pool绑定
    102.  /* successful, attach the worker to the pool */
    103.  worker_attach_to_pool(worker, pool);
    104.  // (2.6) 将worker初始状态设置成idle,
    105.  // wake_up_process以后,worker自动leave idle状态
    106.  /* start the newly created worker */
    107.  spin_lock_irq(&pool->lock);
    108.  worker->pool->nr_workers++;
    109.  worker_enter_idle(worker);
    110.  wake_up_process(worker->task);
    111.  spin_unlock_irq(&pool->lock);
    112.  return worker;
    113. fail:
    114.  if (id >= 0)
    115.   ida_simple_remove(&pool->worker_ida, id);
    116.  kfree(worker);
    117.  return NULL;
    118. }
    119. || →
    120. static void worker_attach_to_pool(struct worker *worker,
    121.        struct worker_pool *pool)
    122. {
    123.  mutex_lock(&pool->attach_mutex);
    124.  // (2.5.1) 将worker线程和cpu绑定
    125.  /*
    126.   * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
    127.   * online CPUs.  It'll be re-applied when any of the CPUs come up.
    128.   */
    129.  set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
    130.  /*
    131.   * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
    132.   * stable across this function.  See the comments above the
    133.   * flag definition for details.
    134.   */
    135.  if (pool->flags & POOL_DISASSOCIATED)
    136.   worker->flags |= WORKER_UNBOUND;
    137.  // (2.5.2) 将worker加入worker_pool链表
    138.  list_add_tail(&worker->node, &pool->workers);
    139.  mutex_unlock(&pool->attach_mutex);
    140. }

    1.1.2 unbound worker_pool

    大部分的work都是通过normal worker_pool来执行的(例如通过schedule_work()、schedule_work_on()压入到系统workqueue(system_wq)中的work),最后都是通过normal worker_pool中的worker来执行的。这些worker是和某个cpu绑定的,work一旦被worker开始执行,都是一直运行到某个cpu上的不会切换cpu。

    unbound worker_pool相对应的意思,就是worker可以在多个cpu上调度的。但是他其实也是绑定的,只不过它绑定的单位不是cpu而是node。所谓的node是对NUMA(Non Uniform Memory Access Architecture)系统来说的,NUMA可能存在多个node,每个node可能包含一个或者多个cpu。

    unbound worker_pool对应内核线程(worker_thread())的命名规则是这样的:

    1. snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
    2.  worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
    3.            "kworker/%s", id_buf);

    so类似名字是unbound worker_pool:

    1. shell@PRO5:/ $ ps | grep "kworker"
    2. root      23906 2     0      0     worker_thr 0000000000 S kworker/u20:2 // unbound pool 20的第2个worker进程
    3. root      24564 2     0      0     worker_thr 0000000000 S kworker/u20:0 // unbound pool 20的第0个worker进程
    4. root      24622 2     0      0     worker_thr 0000000000 S kworker/u21:1 // unbound pool 21的第1个worker进程

    unbound worker_pool也分成两类:

    • unbound_std_wq。每个node对应一个worker_pool,多个node就对应多个worker_pool;

    99a85596c1472823617534e21c16b5bc.png

    对应的拓扑图如下:

    133c3287040bb2709c18454448d8a923.png
    • ordered_wq。所有node对应一个default worker_pool;

    a9dbd75eaffb507d31465cc9fcf7c9f3.png

    对应的拓扑图如下:

    5686c7e7a9e5d390dd6b58709c4eb3b6.png

    以下是unbound worker_pool详细的创建过程代码分析:

    • kernel/workqueue.c:

    • init_workqueues() -> unbound_std_wq_attrs/ordered_wq_attrs

    1. static int __init init_workqueues(void)
    2. {
    3.  // (1) 初始化normal和high nice对应的unbound attrs
    4.  /* create default unbound and ordered wq attrs */
    5.  for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
    6.   struct workqueue_attrs *attrs;
    7.   // (2) unbound_std_wq_attrs
    8.   BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
    9.   attrs->nice = std_nice[i];
    10.   unbound_std_wq_attrs[i] = attrs;
    11.   /*
    12.    * An ordered wq should have only one pwq as ordering is
    13.    * guaranteed by max_active which is enforced by pwqs.
    14.    * Turn off NUMA so that dfl_pwq is used for all nodes.
    15.    */
    16.   // (3) ordered_wq_attrs,no_numa = true;
    17.   BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
    18.   attrs->nice = std_nice[i];
    19.   attrs->no_numa = true;
    20.   ordered_wq_attrs[i] = attrs;
    21.  }
    22. }
    • kernel/workqueue.c:

    • __alloc_workqueue_key() -> alloc_and_link_pwqs() -> apply_workqueue_attrs() -> alloc_unbound_pwq()/numa_pwq_tbl_install()

    1. struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
    2.             unsigned int flags,
    3.             int max_active,
    4.             struct lock_class_key *key,
    5.             const char *lock_name, ...)
    6. {
    7.  size_t tbl_size = 0;
    8.  va_list args;
    9.  struct workqueue_struct *wq;
    10.  struct pool_workqueue *pwq;
    11.  /* see the comment above the definition of WQ_POWER_EFFICIENT */
    12.  if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
    13.   flags |= WQ_UNBOUND;
    14.  /* allocate wq and format name */
    15.  if (flags & WQ_UNBOUND)
    16.   tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
    17.  // (1) 分配workqueue_struct数据结构
    18.  wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
    19.  if (!wq)
    20.   return NULL;
    21.  if (flags & WQ_UNBOUND) {
    22.   wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
    23.   if (!wq->unbound_attrs)
    24.    goto err_free_wq;
    25.  }
    26.  va_start(args, lock_name);
    27.  vsnprintf(wq->name, sizeof(wq->name), fmt, args);
    28.  va_end(args);
    29.  // (2) pwq最多放到worker_pool中的work数
    30.  max_active = max_active ?: WQ_DFL_ACTIVE;
    31.  max_active = wq_clamp_max_active(max_active, flags, wq->name);
    32.  /* init wq */
    33.  wq->flags = flags;
    34.  wq->saved_max_active = max_active;
    35.  mutex_init(&wq->mutex);
    36.  atomic_set(&wq->nr_pwqs_to_flush, 0);
    37.  INIT_LIST_HEAD(&wq->pwqs);
    38.  INIT_LIST_HEAD(&wq->flusher_queue);
    39.  INIT_LIST_HEAD(&wq->flusher_overflow);
    40.  INIT_LIST_HEAD(&wq->maydays);
    41.  lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
    42.  INIT_LIST_HEAD(&wq->list);
    43.  // (3) 给workqueue分配对应的pool_workqueue
    44.  // pool_workqueue将workqueue和worker_pool链接起来
    45.  if (alloc_and_link_pwqs(wq) < 0)
    46.   goto err_free_wq;
    47.  // (4) 如果是WQ_MEM_RECLAIM类型的workqueue
    48.  // 创建对应的rescuer_thread()内核进程
    49.  /*
    50.   * Workqueues which may be used during memory reclaim should
    51.   * have a rescuer to guarantee forward progress.
    52.   */
    53.  if (flags & WQ_MEM_RECLAIM) {
    54.   struct worker *rescuer;
    55.   rescuer = alloc_worker(NUMA_NO_NODE);
    56.   if (!rescuer)
    57.    goto err_destroy;
    58.   rescuer->rescue_wq = wq;
    59.   rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
    60.             wq->name);
    61.   if (IS_ERR(rescuer->task)) {
    62.    kfree(rescuer);
    63.    goto err_destroy;
    64.   }
    65.   wq->rescuer = rescuer;
    66.   rescuer->task->flags |= PF_NO_SETAFFINITY;
    67.   wake_up_process(rescuer->task);
    68.  }
    69.  // (5) 如果是需要,创建workqueue对应的sysfs文件
    70.  if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
    71.   goto err_destroy;
    72.  /*
    73.   * wq_pool_mutex protects global freeze state and workqueues list.
    74.   * Grab it, adjust max_active and add the new @wq to workqueues
    75.   * list.
    76.   */
    77.  mutex_lock(&wq_pool_mutex);
    78.  mutex_lock(&wq->mutex);
    79.  for_each_pwq(pwq, wq)
    80.   pwq_adjust_max_active(pwq);
    81.  mutex_unlock(&wq->mutex);
    82.  // (6) 将新的workqueue加入到全局链表workqueues中
    83.  list_add(&wq->list, &workqueues);
    84.  mutex_unlock(&wq_pool_mutex);
    85.  return wq;
    86. err_free_wq:
    87.  free_workqueue_attrs(wq->unbound_attrs);
    88.  kfree(wq);
    89.  return NULL;
    90. err_destroy:
    91.  destroy_workqueue(wq);
    92.  return NULL;
    93. }
    94. | →
    95. static int alloc_and_link_pwqs(struct workqueue_struct *wq)
    96. {
    97.  bool highpri = wq->flags & WQ_HIGHPRI;
    98.  int cpu, ret;
    99.  // (3.1) normal workqueue
    100.  // pool_workqueue链接workqueue和worker_pool的过程
    101.  if (!(wq->flags & WQ_UNBOUND)) {
    102.   // 给workqueue的每个cpu分配对应的pool_workqueue,赋值给wq->cpu_pwqs
    103.   wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
    104.   if (!wq->cpu_pwqs)
    105.    return -ENOMEM;
    106.   for_each_possible_cpu(cpu) {
    107.    struct pool_workqueue *pwq =
    108.     per_cpu_ptr(wq->cpu_pwqs, cpu);
    109.    struct worker_pool *cpu_pools =
    110.     per_cpu(cpu_worker_pools, cpu);
    111.    // 将初始化时已经创建好的normal worker_pool,赋值给pool_workqueue
    112.    init_pwq(pwq, wq, &cpu_pools[highpri]);
    113.    mutex_lock(&wq->mutex);
    114.    // 将pool_workqueue和workqueue链接起来
    115.    link_pwq(pwq);
    116.    mutex_unlock(&wq->mutex);
    117.   }
    118.   return 0;
    119.  } else if (wq->flags & __WQ_ORDERED) {
    120.  // (3.2) unbound ordered_wq workqueue
    121.  // pool_workqueue链接workqueue和worker_pool的过程
    122.   ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
    123.   /* there should only be single pwq for ordering guarantee */
    124.   WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
    125.          wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
    126.        "ordering guarantee broken for workqueue %s\n", wq->name);
    127.   return ret;
    128.  } else {
    129.  // (3.3) unbound unbound_std_wq workqueue
    130.  // pool_workqueue链接workqueue和worker_pool的过程
    131.   return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
    132.  }
    133. }
    134. || →
    135. int apply_workqueue_attrs(struct workqueue_struct *wq,
    136.      const struct workqueue_attrs *attrs)
    137. {
    138.  // (3.2.1) 根据的ubound的ordered_wq_attrs/unbound_std_wq_attrs
    139.  // 创建对应的pool_workqueue和worker_pool
    140.  // 其中worker_pool不是默认创建好的,是需要动态创建的,对应的worker内核进程也要重新创建
    141.  // 创建好的pool_workqueue赋值给pwq_tbl[node]
    142.  /*
    143.   * If something goes wrong during CPU up/down, we'll fall back to
    144.   * the default pwq covering whole @att- kernel/workqueue.c:  
    145. - __alloc_workqueue_key() -> alloc_and_link_pwqs() -> apply_workqueue_attrs() -> alloc_unbound_pwq()/numa_pwq_tbl_install()rs->cpumask.  Always create
    146.   * it even if we don't use it immediately.
    147.   */
    148.  dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
    149.  if (!dfl_pwq)
    150.   goto enomem_pwq;
    151.  for_each_node(node) {
    152.   if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
    153.    pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
    154.    if (!pwq_tbl[node])
    155.     goto enomem_pwq;
    156.   } else {
    157.    dfl_pwq->refcnt++;
    158.    pwq_tbl[node] = dfl_pwq;
    159.   }
    160.  }
    161.  /* save the previous pwq and install the new one */
    162.  // (3.2.2) 将临时pwq_tbl[node]赋值给wq->numa_pwq_tbl[node]
    163.  for_each_node(node)
    164.   pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
    165. }
    166. ||| →
    167. static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
    168.      const struct workqueue_attrs *attrs)
    169. {
    170.  struct worker_pool *pool;
    171.  struct pool_workqueue *pwq;
    172.  lockdep_assert_held(&wq_pool_mutex);
    173.  // (3.2.1.1) 如果对应attrs已经创建多对应的unbound_pool,则使用已有的unbound_pool
    174.  // 否则根据attrs创建新的unbound_pool
    175.  pool = get_unbound_pool(attrs);
    176.  if (!pool)
    177.   return NULL;
    178.  pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
    179.  if (!pwq) {
    180.   put_unbound_pool(pool);
    181.   return NULL;
    182.  }
    183.  init_pwq(pwq, wq, pool);
    184.  return pwq;
    185. }

    1.2 worker

    每个worker对应一个worker_thread()内核线程,一个worker_pool对应一个或者多个worker。多个worker从同一个链表中worker_pool->worklist获取work进行处理。

    所以这其中有几个重点:

    • worker怎么处理work;

    • worker_pool怎么动态管理worker的数量;

    1.2.1 worker处理work

    处理work的过程主要在worker_thread() -> process_one_work()中处理,我们具体看看代码的实现过程。

    • kernel/workqueue.c:

    • worker_thread() -> process_one_work()

    1. static int worker_thread(void *__worker)
    2. {
    3.  struct worker *worker = __worker;
    4.  struct worker_pool *pool = worker->pool;
    5.  /* tell the scheduler that this is a workqueue worker */
    6.  worker->task->flags |= PF_WQ_WORKER;
    7. woke_up:
    8.  spin_lock_irq(&pool->lock);
    9.  // (1) 是否die
    10.  /* am I supposed to die? */
    11.  if (unlikely(worker->flags & WORKER_DIE)) {
    12.   spin_unlock_irq(&pool->lock);
    13.   WARN_ON_ONCE(!list_empty(&worker->entry));
    14.   worker->task->flags &= ~PF_WQ_WORKER;
    15.   set_task_comm(worker->task, "kworker/dying");
    16.   ida_simple_remove(&pool->worker_ida, worker->id);
    17.   worker_detach_from_pool(worker, pool);
    18.   kfree(worker);
    19.   return 0;
    20.  }
    21.  // (2) 脱离idle状态
    22.  // 被唤醒之前worker都是idle状态
    23.  worker_leave_idle(worker);
    24. recheck:
    25.  
    26.  // (3) 如果需要本worker继续执行则继续,否则进入idle状态
    27.  // need more worker的条件: (pool->worklist != 0) && (pool->nr_running == 0)
    28.  // worklist上有work需要执行,并且现在没有处于running的work
    29.  /* no more worker necessary? */
    30.  if (!need_more_worker(pool))
    31.   goto sleep;
    32.  // (4) 如果(pool->nr_idle == 0),则启动创建更多的worker
    33.  // 说明idle队列中已经没有备用worker了,先创建 一些worker备用
    34.  /* do we need to manage? */
    35.  if (unlikely(!may_start_working(pool)) && manage_workers(worker))
    36.   goto recheck;
    37.  /*
    38.   * ->scheduled list can only be filled while a worker is
    39.   * preparing to process a work or actually processing it.
    40.   * Make sure nobody diddled with it while I was sleeping.
    41.   */
    42.  WARN_ON_ONCE(!list_empty(&worker->scheduled));
    43.  /*
    44.   * Finish PREP stage.  We're guaranteed to have at least one idle
    45.   * worker or that someone else has already assumed the manager
    46.   * role.  This is where @worker starts participating in concurrency
    47.   * management if applicable and concurrency management is restored
    48.   * after being rebound.  See rebind_workers() for details.
    49.   */
    50.  worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
    51.  do {
    52.   // (5) 如果pool->worklist不为空,从其中取出一个work进行处理
    53.   struct work_struct *work =
    54.    list_first_entry(&pool->worklist,
    55.       struct work_struct, entry);
    56.   if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
    57.    /* optimization path, not strictly necessary */
    58.    // (6) 执行正常的work
    59.    process_one_work(worker, work);
    60.    if (unlikely(!list_empty(&worker->scheduled)))
    61.     process_scheduled_works(worker);
    62.   } else {
    63.    // (7) 执行系统特意scheduled给某个worker的work
    64.    // 普通的work是放在池子的公共list中的pool->worklist
    65.    // 只有一些特殊的work被特意派送给某个worker的worker->scheduled
    66.    // 包括:1、执行flush_work时插入的barrier work;
    67.    // 2、collision时从其他worker推送到本worker的work
    68.    move_linked_works(work, &worker->scheduled, NULL);
    69.    process_scheduled_works(worker);
    70.   }
    71.  // (8) worker keep_working的条件:
    72.  // pool->worklist不为空 && (pool->nr_running <= 1)
    73.  } while (keep_working(pool));
    74.  worker_set_flags(worker, WORKER_PREP);supposed
    75. sleep:
    76.  // (9) worker进入idle状态
    77.  /*
    78.   * pool->lock is held and there's no work to process and no need to
    79.   * manage, sleep.  Workers are woken up only while holding
    80.   * pool->lock or from local cpu, so setting the current state
    81.   * before releasing pool->lock is enough to prevent losing any
    82.   * event.
    83.   */
    84.  worker_enter_idle(worker);
    85.  __set_current_state(TASK_INTERRUPTIBLE);
    86.  spin_unlock_irq(&pool->lock);
    87.  schedule();
    88.  goto woke_up;
    89. }
    90. | →
    91. static void process_one_work(struct worker *worker, struct work_struct *work)
    92. __releases(&pool->lock)
    93. __acquires(&pool->lock)
    94. {
    95.  struct pool_workqueue *pwq = get_work_pwq(work);
    96.  struct worker_pool *pool = worker->pool;
    97.  bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
    98.  int work_color;
    99.  struct worker *collision;
    100. #ifdef CONFIG_LOCKDEP
    101.  /*
    102.   * It is permissible to free the struct work_struct from
    103.   * inside the function that is called from it, this we need to
    104.   * take into account for lockdep too.  To avoid bogus "held
    105.   * lock freed" warnings as well as problems when looking into
    106.   * work->lockdep_map, make a copy and use that here.
    107.   */
    108.  struct lockdep_map lockdep_map;
    109.  lockdep_copy_map(&lockdep_map, &work->lockdep_map);
    110. #endif
    111.  /* ensure we're on the correct CPU */
    112.  WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
    113.        raw_smp_processor_id() != pool->cpu);
    114.  // (8.1) 如果work已经在worker_pool的其他worker上执行,
    115.  // 将work放入对应worker的scheduled队列中延后执行
    116.  /*
    117.   * A single work shouldn't be executed concurrently by
    118.   * multiple workers on a single cpu.  Check whether anyone is
    119.   * already processing the work.  If so, defer the work to the
    120.   * currently executing one.
    121.   */
    122.  collision = find_worker_executing_work(pool, work);
    123.  if (unlikely(collision)) {
    124.   move_linked_works(work, &collision->scheduled, NULL);
    125.   return;
    126.  }
    127.  // (8.2) 将worker加入busy队列pool->busy_hash
    128.  /* claim and dequeue */
    129.  debug_work_deactivate(work);
    130.  hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
    131.  worker->current_work = work;
    132.  worker->current_func = work->func;
    133.  worker->current_pwq = pwq;
    134.  work_color = get_work_color(work);
    135.  list_del_init(&work->entry);
    136.  // (8.3) 如果work所在的wq是cpu密集型的WQ_CPU_INTENSIVE
    137.  // 则当前work的执行脱离worker_pool的动态调度,成为一个独立的线程
    138.  /*
    139.   * CPU intensive works don't participate in concurrency management.
    140.   * They're the scheduler's responsibility.  This takes @worker out
    141.   * of concurrency management and the next code block will chain
    142.   * execution of the pending work items.
    143.   */
    144.  if (unlikely(cpu_intensive))
    145.   worker_set_flags(worker, WORKER_CPU_INTENSIVE);
    146.  // (8.4) 在UNBOUND或者CPU_INTENSIVE work中判断是否需要唤醒idle worker
    147.  // 普通work不会执行这个操作
    148.  /*
    149.   * Wake up another worker if necessary.  The condition is always
    150.   * false for normal per-cpu workers since nr_running would always
    151.   * be >= 1 at this point.  This is used to chain execution of the
    152.   * pending work items for WORKER_NOT_RUNNING workers such as the
    153.   * UNBOUND and CPU_INTENSIVE ones.
    154.   */
    155.  if (need_more_worker(pool))
    156.   wake_up_worker(pool);
    157.  /*
    158.   * Record the last pool and clear PENDING which should be the last
    159.   * update to @work.  Also, do this inside @pool->lock so that
    160.   * PENDING and queued state changes happen together while IRQ is
    161.   * disabled.
    162.   */
    163.  set_work_pool_and_clear_pending(work, pool->id);
    164.  spin_unlock_irq(&pool->lock);
    165.  lock_map_acquire_read(&pwq->wq->lockdep_map);
    166.  lock_map_acquire(&lockdep_map);
    167.  trace_workqueue_execute_start(work);
    168.  // (8.5) 执行work函数
    169.  worker->current_func(work);
    170.  /*
    171.   * While we must be careful to not use "work" after this, the trace
    172.   * point will only record its address.
    173.   */
    174.  trace_workqueue_execute_end(work);
    175.  lock_map_release(&lockdep_map);
    176.  lock_map_release(&pwq->wq->lockdep_map);
    177.  if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
    178.   pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
    179.          "     last function: %pf\n",
    180.          current->comm, preempt_count(), task_pid_nr(current),
    181.          worker->current_func);
    182.   debug_show_held_locks(current);
    183.   dump_stack();
    184.  }
    185.  /*
    186.   * The following prevents a kworker from hogging CPU on !PREEMPT
    187.   * kernels, where a requeueing work item waiting for something to
    188.   * happen could deadlock with stop_machine as such work item could
    189.   * indefinitely requeue itself while all other CPUs are trapped in
    190.   * stop_machine. At the same time, report a quiescent RCU state so
    191.   * the same condition doesn't freeze RCU.
    192.   */
    193.  cond_resched_rcu_qs();
    194.  spin_lock_irq(&pool->lock);
    195.  /* clear cpu intensive status */
    196.  if (unlikely(cpu_intensive))
    197.   worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
    198.  /* we're done with it, release */
    199.  hash_del(&worker->hentry);
    200.  worker->current_work = NULL;
    201.  worker->current_func = NULL;
    202.  worker->current_pwq = NULL;
    203.  worker->desc_valid = false;
    204.  pwq_dec_nr_in_flight(pwq, work_color);
    205. }

    1.2.2 worker_pool动态管理worker

    worker_pool怎么来动态增减worker,这部分的算法是CMWQ的核心。其思想如下:

    • worker_pool中的worker有3种状态:idle、running、suspend;

    • 如果worker_pool中有work需要处理,保持至少一个runn- kernel/workqueue.c:

    • worker_thread() -> process_one_work() ing worker来处理;

    • running worker在处理work的过程中进入了阻塞suspend状态,为了保持其他work的执行,需要唤醒新的idle worker来处理work;

    • 如果有work需要执行且running worker大于1个,会让多余的running worker进入idle状态;

    • 如果没有work需要执行,会让所有worker进入idle状态;

    • 如果创建的worker过多,destroy_worker在300s(IDLE_WORKER_TIMEOUT)时间内没有再次运行的idle worker。

    26531075eb59f3cda2ef8778cb33eb2d.png

    详细代码可以参考上节worker_thread() -> process_one_work()的分析。

    为了追踪worker的running和suspend状态,用来动态调整worker的数量。wq使用在进程调度中加钩子函数的技巧:

    • 追踪worker从suspend进入running状态:ttwu_activate() -> wq_worker_waking_up()

    1. void wq_worker_waking_up(struct task_struct *task, int cpu)
    2. {
    3.  struct worker *worker = kthread_data(task);
    4.  if (!(worker->flags & WORKER_NOT_RUNNING)) {
    5.   WARN_ON_ONCE(worker->pool->cpu != cpu);
    6.   // 增加worker_pool中running的worker数量
    7.   atomic_inc(&worker->pool->nr_running);
    8.  }
    9. }
    • 追踪worker从running进入suspend状态:__schedule() -> wq_worker_sleeping()

    1. struct task_struct *wq_worker_sleeping(struct task_struct *task, int cpu)
    2. {
    3.  struct worker *worker = kthread_data(task), *to_wakeup = NULL;
    4.  struct worker_pool *pool;
    5.  /*
    6.   * Rescuers, which may not have all the fields set up like normal
    7.   * workers, also reach here, let's not access anything before
    8.   * checking NOT_RUNNING.
    9.   */
    10.  if (worker->flags & WORKER_NOT_RUNNING)
    11.   return NULL;
    12.  pool = worker->pool;
    13.  /* this can only happen on the local cpu */
    14.  if (WARN_ON_ONCE(cpu != raw_smp_processor_id() || pool->cpu != cpu))
    15.   return NULL;
    16.  /*
    17.   * The counterpart of the following dec_and_test, implied mb,
    18.   * worklist not empty test sequence is in insert_work().
    19.   * Please read comment there.
    20.   *
    21.   * NOT_RUNNING is clear.  This means that we're bound to and
    22.   * running on the local cpu w/ rq lock held and preemption
    23.   * disabled, which in turn means that none else could be
    24.   * manipulating idle_list, so dereferencing idle_list without pool
    25.   * lock is safe.
    26.   */
    27.  // 减少worker_pool中running的worker数量
    28.  // 如果worklist还有work需要处理,唤醒第一个idle worker进行处理
    29.  if (atomic_dec_and_test(&pool->nr_running) &&
    30.      !list_empty(&pool->worklist))
    31.   to_wakeup = first_idle_worker(pool);
    32.  return to_wakeup ? to_wakeup->task : NULL;
    33. }

    这里worker_pool的调度思想是:如果有work需要处理,保持一个running状态的worker处理,不多也不少。

    但是这里有一个问题如果work是cpu密集型的,它虽然也没有进入suspend状态,但是会长时间的占用cpu,让后续的work阻塞太长时间。

    为了解决这个问题,CMWQ设计了WQ_CPU_INTENSIVE,如果一个wq声明自己是CPU_INTENSIVE,则让当前worker脱离动态调度,像是进入了suspend状态,那么CMWQ会创建新的worker,后续的work会得到执行。

    • kernel/workqueue.c:

    • worker_thread() -> process_one_work()

    1. static void process_one_work(struct worker *worker, struct work_struct *work)
    2. __releases(&pool->lock)
    3. __acquires(&pool->lock)
    4. {
    5.  bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
    6.  // (1) 设置当前worker的WORKER_CPU_INTENSIVE标志
    7.  // nr_running会被减1
    8.  // 对worker_pool来说,当前worker相当于进入了suspend状态
    9.  /*
    10.   * CPU intensive works don't participate in concurrency management.
    11.   * They're the scheduler's responsibility.  This takes @worker out
    12.   * of concurrency management and the next code block will chain
    13.   * execution of the pending work items.
    14.   */
    15.  if (unlikely(cpu_intensive))
    16.   worker_set_flags(worker, WORKER_CPU_INTENSIVE);
    17.  // (2) 接上一步,判断是否需要唤醒新的worker来处理work
    18.  /*
    19.   * Wake up another worker if necessary.  The condition is always
    20.   * false for normal per-cpu workers since nr_running would always
    21.   * be >= 1 at this point.  This is used to chain execution of the
    22.   * pending work items for WORKER_NOT_RUNNING workers such as the
    23.   * UNBOUND and CPU_INTENSIVE ones.
    24.   */
    25.  if (need_more_worker(pool))
    26.   wake_up_worker(pool);
    27.  // (3) 执行work
    28.  worker->current_func(work);
    29.  // (4) 执行完,清理当前worker的WORKER_CPU_INTENSIVE标志
    30.  // 当前worker重新进入running状态
    31.  /* clear cpu intensive status */
    32.  if (unlikely(cpu_intensive))
    33.   worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
    34. }
    35.  WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
    36.       WORKER_UNBOUND | WORKER_REBOUND,
    37. static inline void worker_set_flags(struct worker *worker, unsigned int flags)
    38. {
    39.  struct worker_pool *pool = worker->pool;
    40.  WARN_ON_ONCE(worker->task != current);
    41.  /* If transitioning into NOT_RUNNING, adjust nr_running. */
    42.  if ((flags & WORKER_NOT_RUNNING) &&
    43.      !(worker->flags & WORKER_NOT_RUNNING)) {
    44.   atomic_dec(&pool->nr_running);
    45.  }
    46.  worker->flags |= flags;
    47. }
    48. static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
    49. {
    50.  struct worker_pool *pool = worker->pool;
    51.  unsigned int oflags = worker->flags;
    52.  WARN_ON_ONCE(worker->task != current);
    53.  worker->flags &= ~flags;
    54.  /*
    55.   * If transitioning out of NOT_RUNNING, increment nr_running.  Note
    56.   * that the nested NOT_RUNNING is not a noop.  NOT_RUNNING is mask
    57.   * of multiple flags, not a single flag.
    58.   */
    59.  if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
    60.   if (!(worker->flags & WORKER_NOT_RUNNING))
    61.    atomic_inc(&pool->nr_running);
    62. }

    1.2.3 cpu hotplug处理

    从上几节可以看到,系统会创建和cpu绑定的normal worker_pool和不绑定cpu的unbound worker_pool,worker_pool又会动态的创建worker。

    那么在cpu hotplug的时候,会怎么样动态的处理worker_pool和worker呢?来看具体的代码分析:

    • kernel/workqueue.c:

    • workqueue_cpu_up_callback()/workqueue_cpu_down_callback()

    1. static int __init init_workqueues(void)
    2. {
    3.  cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP);
    4.  hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
    5. }
    6. | →
    7. static int workqueue_cpu_down_callback(struct notifier_block *nfb,
    8.        unsigned long action,
    9.        void *hcpu)
    10. {
    11.  int cpu = (unsigned long)hcpu;
    12.  struct work_struct unbind_work;
    13.  struct workqueue_struct *wq;
    14.  switch (action & ~CPU_TASKS_FROZEN) {
    15.  case CPU_DOWN_PREPARE:
    16.   /* unbinding per-cpu workers should happen on the local CPU */
    17.   INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
    18.   // (1) cpu down_prepare
    19.   // 把和当前cpu绑定的normal worker_pool上的worker停工
    20.   // 随着当前cpu被down掉,这些worker会迁移到其他cpu上
    21.   queue_work_on(cpu, system_highpri_wq, &unbind_work);
    22.   // (2) unbound wq对cpu变化的更新
    23.   /* update NUMA affinity of unbound workqueues */
    24.   mutex_lock(&wq_pool_mutex);
    25.   list_for_each_entry(wq, &workqueues, list)
    26.    wq_update_unbound_numa(wq, cpu, false);
    27.   mutex_unlock(&wq_pool_mutex);
    28.   /* wait for per-cpu unbinding to finish */
    29.   flush_work(&unbind_work);
    30.   destroy_work_on_stack(&unbind_work);
    31.   break;
    32.  }
    33.  return NOTIFY_OK;
    34. }
    35. | →
    36. static int workqueue_cpu_up_callback(struct notifier_block *nfb,
    37.             unsigned long action,
    38.             void *hcpu)
    39. {
    40.  int cpu = (unsigned long)hcpu;
    41.  struct worker_pool *pool;
    42.  struct workqueue_struct *wq;
    43.  int pi;
    44.  switch (action & ~CPU_TASKS_FROZEN) {
    45.  case CPU_UP_PREPARE:
    46.   for_each_cpu_worker_pool(pool, cpu) {
    47.    if (pool->nr_workers)
    48.     continue;
    49.    if (!create_worker(pool))
    50.     return NOTIFY_BAD;
    51.   }
    52.   break;
    53.  case CPU_DOWN_FAILED:
    54.  case CPU_ONLINE:
    55.   mutex_lock(&wq_pool_mutex);
    56.   
    57.   // (3) cpu up
    58.   for_each_pool(pool, pi) {
    59.    mutex_lock(&pool->attach_mutex);
    60.    // 如果和当前cpu绑定的normal worker_pool上,有WORKER_UNBOUND停工的worker
    61.    // 重新绑定worker到worker_pool
    62.    // 让这些worker开工,并绑定到当前cpu
    63.    if (pool->cpu == cpu)
    64.     rebind_workers(pool);
    65.    else if (pool->cpu < 0)
    66.     restore_unbound_workers_cpumask(pool, cpu);
    67.    mutex_unlock(&pool->attach_mutex);
    68.   }
    69.   /* update NUMA affinity of unbound workqueues */
    70.   list_for_each_entry(wq, &workqueues, list)
    71.    wq_update_unbound_numa(wq, cpu, true);
    72.   mutex_unlock(&wq_pool_mutex);
    73.   break;
    74.  }
    75.  return NOTIFY_OK;
    76. }

    1.3 workqueue

    workqueue就是存放一组work的集合,基本可以分为两类:一类系统创建的workqueue,一类是用户自己创建的workqueue。

    不论是系统还是用户workqueue,如果没有指定WQ_UNBOUND,默认都是和normal worker_pool绑定。

    1.3.1 系统workqueue

    系统在初始化时创建了一批默认的workqueue:system_wq、system_highpri_wq、system_long_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。

    像system_wq,就是schedule_work()默认使用的。

    • kernel/workqueue.c:

    • init_workqueues()

    1. static int __init init_workqueues(void)
    2. {
    3.  system_wq = alloc_workqueue("events"00);
    4.  system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
    5.  system_long_wq = alloc_workqueue("events_long"00);
    6.  system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
    7.          WQ_UNBOUND_MAX_ACTIVE);
    8.  system_freezable_wq = alloc_workqueue("events_freezable",
    9.            WQ_FREEZABLE, 0);
    10.  system_power_efficient_wq = alloc_workqueue("events_power_efficient",
    11.            WQ_POWER_EFFICIENT, 0);
    12.  system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
    13.            WQ_FREEZABLE | WQ_POWER_EFFICIENT,
    14.            0);
    15. }

    1.3.2 workqueue创建

    详细过程见上几节的代码分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

    1.3.3 flush_workqueue()

    这一部分的逻辑,wq->work_color、wq->flush_color换来换去的逻辑实在看的头晕。看不懂暂时不想看,放着以后看吧,或者有谁看懂了教我一下。:)

    1.4 pool_workqueue

    pool_workqueue只是一个中介角色。

    详细过程见上几节的代码分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。

    1.5 work

    描述一份待执行的工作。

    1.5.1 queue_work()

    将work压入到workqueue当中。

    • kernel/workqueue.c:

    • queue_work() -> queue_work_on() -> __queue_work()

    1. static void __queue_work(int cpu, struct workqueue_struct *wq,
    2.     struct work_struct *work)
    3. {
    4.  struct pool_workqueue *pwq;
    5.  struct worker_pool *last_pool;
    6.  struct list_head *worklist;
    7.  unsigned int work_flags;
    8.  unsigned int req_cpu = cpu;
    9.  /*
    10.   * While a work item is PENDING && off queue, a task trying to
    11.   * steal the PENDING will busy-loop waiting for it to either get
    12.   * queued or lose PENDING.  Grabbing PENDING and queueing should
    13.   * happen with IRQ disabled.
    14.   */
    15.  WARN_ON_ONCE(!irqs_disabled());
    16.  debug_work_activate(work);
    17.  /* if draining, only works from the same workqueue are allowed */
    18.  if (unlikely(wq->flags & __WQ_DRAINING) &&
    19.      WARN_ON_ONCE(!is_chained_work(wq)))
    20.   return;
    21. retry:
    22.  // (1) 如果没有指定cpu,则使用当前cpu
    23.  if (req_cpu == WORK_CPU_UNBOUND)
    24.   cpu = raw_smp_processor_id();
    25.  /* pwq which will be used unless @work is executing elsewhere */
    26.  if (!(wq->flags & WQ_UNBOUND))
    27.   // (2) 对于normal wq,使用当前cpu对应的normal worker_pool
    28.   pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
    29.  else
    30.   // (3) 对于unbound wq,使用当前cpu对应node的worker_pool
    31.   pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
    32.  // (4) 如果work在其他worker上正在被执行,把work压到对应的worker上去
    33.  // 避免work出现重入的问题
    34.  /*
    35.   * If @work was previously on a different pool, it might still be
    36.   * running there, in which case the work needs to be queued on that
    37.   * pool to guarantee non-reentrancy.
    38.   */
    39.  last_pool = get_work_pool(work);
    40.  if (last_pool && last_pool != pwq->pool) {
    41.   struct worker *worker;
    42.   spin_lock(&last_pool->lock);
    43.   worker = find_worker_executing_work(last_pool, work);
    44.   if (worker && worker->current_pwq->wq == wq) {
    45.    pwq = worker->current_pwq;
    46.   } else {
    47.    /* meh... not running there, queue here */
    48.    spin_unlock(&last_pool->lock);
    49.    spin_lock(&pwq->pool->lock);
    50.   }
    51.  } else {
    52.   spin_lock(&pwq->pool->lock);
    53.  }
    54.  /*
    55.   * pwq is determined and locked.  For unbound pools, we could have
    56.   * raced with pwq release and it could already be dead.  If its
    57.   * refcnt is zero, repeat pwq selection.  Note that pwqs never die
    58.   * without another pwq replacing it in the numa_pwq_tbl or while
    59.   * work items are executing on it, so the retrying is guaranteed to
    60.   * make forward-progress.
    61.   */
    62.  if (unlikely(!pwq->refcnt)) {
    63.   if (wq->flags & WQ_UNBOUND) {
    64.    spin_unlock(&pwq->pool->lock);
    65.    cpu_relax();
    66.    goto retry;
    67.   }
    68.   /* oops */
    69.   WARN_ONCE(true"workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
    70.      wq->name, cpu);
    71.  }
    72.  /* pwq determined, queue */
    73.  trace_workqueue_queue_work(req_cpu, pwq, work);
    74.  if (WARN_ON(!list_empty(&work->entry))) {
    75.   spin_unlock(&pwq->pool->lock);
    76.   return;
    77.  }
    78.  pwq->nr_in_flight[pwq->work_color]++;
    79.  work_flags = work_color_to_flags(pwq->work_color);
    80.  // (5) 如果还没有达到max_active,将work挂载到pool->worklist
    81.  if (likely(pwq->nr_active < pwq->max_active)) {
    82.   trace_workqueue_activate_work(work);
    83.   pwq->nr_active++;
    84.   worklist = &pwq->pool->worklist;
    85.  // 否则,将work挂载到临时队列pwq->delayed_works
    86.  } else {
    87.   work_flags |= WORK_STRUCT_DELAYED;
    88.   worklist = &pwq->delayed_works;
    89.  }
    90.  // (6) 将work压入worklist当中
    91.  insert_work(pwq, work, worklist, work_flags);
    92.  spin_unlock(&pwq->pool->lock);
    93. }

    1.5.2 flush_work()

    flush某个work,确保work执行完成。

    怎么判断异步的work已经执行完成?这里面使用了一个技巧:在目标work的后面插入一个新的work wq_barrier,如果wq_barrier执行完成,那么目标work肯定已经执行完成。

    • kernel/workqueue.c:

    • queue_work() -> queue_work_on() -> __queue_work()

    1. /**
    2.  * flush_work - wait for a work to finish executing the last queueing instance
    3.  * @work: the work to flush
    4.  *
    5.  * Wait until @work has finished execution.  @work is guaranteed to be idle
    6.  * on return if it hasn't been requeued since flush started.
    7.  *
    8.  * Return:
    9.  * %true if flush_work() waited for the work to finish execution,
    10.  * %false if it was already idle.
    11.  */
    12. bool flush_work(struct work_struct *work)
    13. {
    14.  struct wq_barrier barr;
    15.  lock_map_acquire(&work->lockdep_map);
    16.  lock_map_release(&work->lockdep_map);
    17.  if (start_flush_work(work, &barr)) {
    18.   // 等待barr work执行完成的信号
    19.   wait_for_completion(&barr.done);
    20.   destroy_work_on_stack(&barr.work);
    21.   return true;
    22.  } else {
    23.   return false;
    24.  }
    25. }
    26. | →
    27. static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
    28. {
    29.  struct worker *worker = NULL;
    30.  struct worker_pool *pool;
    31.  struct pool_workqueue *pwq;
    32.  might_sleep();
    33.  // (1) 如果work所在worker_pool为NULL,说明work已经执行完
    34.  local_irq_disable();
    35.  pool = get_work_pool(work);
    36.  if (!pool) {
    37.   local_irq_enable();
    38.   return false;
    39.  }
    40.  spin_lock(&pool->lock);
    41.  /* see the comment in try_to_grab_pending() with the same code */
    42.  pwq = get_work_pwq(work);
    43.  if (pwq) {
    44.   // (2) 如果work所在pwq指向的worker_pool不等于上一步得到的worker_pool,说明work已经执行完
    45.   if (unlikely(pwq->pool != pool))
    46.    goto already_gone;
    47.  } else {
    48.   // (3) 如果work所在pwq为NULL,并且也没有在当前执行的work中,说明work已经执行完
    49.   worker = find_worker_executing_work(pool, work);
    50.   if (!worker)
    51.    goto already_gone;
    52.   pwq = worker->current_pwq;
    53.  }
    54.  // (4) 如果work没有执行完,向work的后面插入barr work
    55.  insert_wq_barrier(pwq, barr, work, worker);
    56.  spin_unlock_irq(&pool->lock);
    57.  /*
    58.   * If @max_active is 1 or rescuer is in use, flushing another work
    59.   * item on the same workqueue may lead to deadlock.  Make sure the
    60.   * flusher is not running on the same workqueue by verifying write
    61.   * access.
    62.   */
    63.  if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)
    64.   lock_map_acquire(&pwq->wq->lockdep_map);
    65.  else
    66.   lock_map_acquire_read(&pwq->wq->lockdep_map);
    67.  lock_map_release(&pwq->wq->lockdep_map);
    68.  return true;
    69. already_gone:
    70.  spin_unlock_irq(&pool->lock);
    71.  return false;
    72. }
    73. || →
    74. static void insert_wq_barrier(struct pool_workqueue *pwq,
    75.          struct wq_barrier *barr,
    76.          struct work_struct *target, struct worker *worker)
    77. {
    78.  struct list_head *head;
    79.  unsigned int linked = 0;
    80.  /*
    81.   * debugobject calls are safe here even with pool->lock locked
    82.   * as we know for sure that this will not trigger any of the
    83.   * checks and call back into the fixup functions where we
    84.   * might deadlock.
    85.   */
    86.  // (4.1) barr work的执行函数wq_barrier_func()
    87.  INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
    88.  __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
    89.  init_completion(&barr->done);
    90.  /*
    91.   * If @target is currently being executed, schedule the
    92.   * barrier to the worker; otherwise, put it after @target.
    93.   */
    94.  // (4.2) 如果work当前在worker中执行,则barr work插入scheduled队列
    95.  if (worker)
    96.   head = worker->scheduled.next;
    97.  // 否则,则barr work插入正常的worklist队列中,插入位置在目标work后面
    98.  // 并且置上WORK_STRUCT_LINKED标志
    99.  else {
    100.   unsigned long *bits = work_data_bits(target);
    101.   head = target->entry.next;
    102.   /* there can already be other linked works, inherit and set */
    103.   linked = *bits & WORK_STRUCT_LINKED;
    104.   __set_bit(WORK_STRUCT_LINKED_BIT, bits);
    105.  }
    106.  debug_work_activate(&barr->work);
    107.  insert_work(pwq, &barr->work, head,
    108.       work_color_to_flags(WORK_NO_COLOR) | linked);
    109. }
    110. ||| →
    111. static void wq_barrier_func(struct work_struct *work)
    112. {
    113.  struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
    114.  // (4.1.1) barr work执行完成,发出complete信号。
    115.  complete(&barr->done);
    116. }

    2.Workqueue对外接口函数

    CMWQ实现的workqueue机制,被包装成相应的对外接口函数。

    2.1 schedule_work()

    把work压入系统默认wq system_wq,WORK_CPU_UNBOUND指定worker为当前cpu绑定的normal worker_pool创建的worker。

    • kernel/workqueue.c:

    • schedule_work() -> queue_work_on() -> __queue_work()

    1. static inline bool schedule_work(struct work_struct *work)
    2. {
    3.  return queue_work(system_wq, work);
    4. }
    5. | →
    6. static inline bool queue_work(struct workqueue_struct *wq,
    7.          struct work_struct *work)
    8. {
    9.  return queue_work_on(WORK_CPU_UNBOUND, wq, work);
    10. }

    2.2 sschedule_work_on()

    在schedule_work()基础上,可以指定work运行的cpu。

    • kernel/workqueue.c:

    • schedule_work_on() -> queue_work_on() -> __queue_work()

    1. static inline bool schedule_work_on(int cpu, struct work_struct *work)
    2. {
    3.  return queue_work_on(cpu, system_wq, work);
    4. }

    2.3 schedule_delayed_work()

    启动一个timer,在timer定时到了以后调用delayed_work_timer_fn()把work压入系统默认wq system_wq。

    • kernel/workqueue.c:

    • schedule_work_on() -> queue_work_on() -> __queue_work()

    1. static inline bool schedule_delayed_work(struct delayed_work *dwork,
    2.       unsigned long delay)
    3. {
    4.  return queue_delayed_work(system_wq, dwork, delay);
    5. }
    6. | →
    7. static inline bool queue_delayed_work(struct workqueue_struct *wq,
    8.           struct delayed_work *dwork,
    9.           unsigned long delay)
    10. {
    11.  return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
    12. }
    13. || →
    14. bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
    15.       struct delayed_work *dwork, unsigned long delay)
    16. {
    17.  struct work_struct *work = &dwork->work;
    18.  bool ret = false;
    19.  unsigned long flags;
    20.  /* read the comment in __queue_work() */
    21.  local_irq_save(flags);
    22.  if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
    23.   __queue_delayed_work(cpu, wq, dwork, delay);
    24.   ret = true;
    25.  }
    26.  local_irq_restore(flags);
    27.  return ret;
    28. }
    29. ||| →
    30. static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
    31.     struct delayed_work *dwork, unsigned long delay)
    32. {
    33.  struct timer_list *timer = &dwork->timer;
    34.  struct work_struct *work = &dwork->work;
    35.  WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
    36.        timer->data != (unsigned long)dwork);
    37.  WARN_ON_ONCE(timer_pending(timer));
    38.  WARN_ON_ONCE(!list_empty(&work->entry));
    39.  /*
    40.   * If @delay is 0, queue @dwork->work immediately.  This is for
    41.   * both optimization and correctness.  The earliest @timer can
    42.   * expire is on the closest next tick and delayed_work users depend
    43.   * on that there's no such delay when @delay is 0.
    44.   */
    45.  if (!delay) {
    46.   __queue_work(cpu, wq, &dwork->work);
    47.   return;
    48.  }
    49.  timer_stats_timer_set_start_info(&dwork->timer);
    50.  dwork->wq = wq;
    51.  dwork->cpu = cpu;
    52.  timer->expires = jiffies + delay;
    53.  if (unlikely(cpu != WORK_CPU_UNBOUND))
    54.   add_timer_on(timer, cpu);
    55.  else
    56.   add_timer(timer);
    57. }
    58. |||| →
    59. void delayed_work_timer_fn(unsigned long __data)
    60. {
    61.  struct delayed_work *dwork = (struct delayed_work *)__data;
    62.  /* should have been called from irqsafe timer with irq already off */
    63.  __queue_work(dwork->cpu, dwork->wq, &dwork->work);
    64. }

    参考资料

    1. Documentation/workqueue.txt

    b5fb81b9d619b6fcd1c00645f14077dc.jpeg

  • 相关阅读:
    python发送邮件和企业微信
    Linux中安装Redis
    Code Review:提升代码质量与团队能力的利器
    【技术分享】NetLogon于域内提权漏洞(CVE-2020-1472)
    IIS发布.net网站(配置Nginx以及HTTP和HTTPS)
    神器必会!特别好使的编辑器Source Insight
    初学Flutter:swiper实现
    2022深圳xxx校招Java笔试题目(选择题+简答题)
    web前端网页设计期末课程大作业:旅游网页主题网站设计——紫色的旅游开发景点网站静态模板(4页)HTML+CSS+JavaScript
    MyBatisPlus(二十)防全表更新与删除
  • 原文地址:https://blog.csdn.net/melody157398/article/details/126457865