• [OC学习笔记]多线程之GCD


    串行队列和并发队列的源码解析

    在我们的开发过程中,使用队列的时候,苹果给我们给了3个获取队列的方法:

    //主队列
    dispatch_queue_t mainQueue = dispatch_get_main_queue();
    //全局并发队列
    dispatch_queue_t globalQueue = dispatch_get_global_queue(0, 0);
    //自己创建的队列
    dispatch_queue_t normalQueue = dispatch_queue_create("com.test.serial", DISPATCH_QUEUE_SERIAL);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    dispatch_get_main_queue

    打开源码,可以在queue.h里面找到对应的代码:

    dispatch_queue_main_t
    dispatch_get_main_queue(void)
    {
    	return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    接下来找DISPATCH_GLOBAL_OBJECT

    #define DISPATCH_GLOBAL_OBJECT(type, object) ((type)&(object))
    // dispatch_queue_main_t & _dispatch_main_q 
    
    • 1
    • 2

    可以得出类型是dispatch_queue_main_t,对象是_dispatch_main_q,继续搜索_dispatch_main_q

    struct dispatch_queue_static_s _dispatch_main_q = {
    	DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
    #if !DISPATCH_USE_RESOLVERS
    	.do_targetq = _dispatch_get_default_queue(true),
    #endif
    	.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
    			DISPATCH_QUEUE_ROLE_BASE_ANON,
    	.dq_label = "com.apple.main-thread",
    	.dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
    	.dq_serialnum = 1,
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    可以看到:主队列的lable = com.apple.main-threaddq_serialnum = 1就说明是一个串行队列,那么队列是怎么创建的呢?我们知道这里用到了一个函数dispatch_queue_create,接下来就先探索一下吧。

    dispatch_queue_create

    我们再来看一看如何创建队列,打开源码,找到dispatch_queue_create

    // queue.c
    dispatch_queue_t
    dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
    {
    	return _dispatch_lane_create_with_target(label, attr,
    			DISPATCH_TARGET_QUEUE_DEFAULT, true);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    _dispatch_lane_create_with_target

    可以看到是调用_dispatch_lane_create_with_target并添加了2个默认参数实现的,找到它的对应实现:

    static dispatch_queue_t
    _dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
    		dispatch_queue_t tq, bool legacy)
    {
    	dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
    
    	// 优先级的处理
    	// Step 1: Normalize arguments (qos, overcommit, tq)
    	//
    
    	...
    
    	// 初始化queue
    	// Step 2: Initialize the queue
    	//
    
    	...
    	
    	// 申请和开辟内存
    	dispatch_lane_t dq = _dispatch_object_alloc(vtable,
    			sizeof(struct dispatch_lane_s));
    	// 构造函数初始化  dqai.dqai_concurrent:是否是并发
    	_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
    			DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
    			(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
    
    	dq->dq_label = label;
    	dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
    			dqai.dqai_relpri);
    	if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
    		dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    	}
    	if (!dqai.dqai_inactive) {
    		_dispatch_queue_priority_inherit_from_target(dq, tq);
    		_dispatch_lane_inherit_wlh_from_target(dq, tq);
    	}
    	_dispatch_retain(tq);
    	dq->do_targetq = tq;
    	_dispatch_object_debug(dq, "%s", __func__);
    	return _dispatch_trace_queue_create(dq)._dq;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    第一行中的_dispatch_queue_attr_to_info方法里面,把我们传入的DISPATCH_QUEUE_SERIAL或者DISPATCH_QUEUE_CONCURRENT参数进行封装,封装成了dqai。我们可以大致看看封装的实现:

    dispatch_queue_attr_info_t
    _dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
    {
    	dispatch_queue_attr_info_t dqai = { };
    	// 串行队列直接返回空的 dqai 结构体
    	if (!dqa) return dqai;
    
    #if DISPATCH_VARIANT_STATIC
    	if (dqa == &_dispatch_queue_attr_concurrent) {
    		// ⚠️
    		dqai.dqai_concurrent = true;
    		return dqai;
    	}
    #endif
    
    	if (dqa < _dispatch_queue_attrs ||
    			dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
    #ifndef __APPLE__
    		if (memcmp(dqa, &_dispatch_queue_attrs[0],
    				sizeof(struct dispatch_queue_attr_s)) == 0) {
    			dqa = (dispatch_queue_attr_t)&_dispatch_queue_attrs[0];
    		} else
    #endif // __APPLE__
    		DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    	}
    
    	size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
    	
    	// 并发队列结构体位域的默认配置和赋值
    	dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
    	idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;
    	// ⚠️⚠️
    	dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
    	idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;
    
    	dqai.dqai_relpri = -(int)(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
    	idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;
    
    	dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
    	idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;
    
    	dqai.dqai_autorelease_frequency =
    			idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    	idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    
    	dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    	idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    
    	return dqai;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    dqai里面有个dqai_concurrent的属性,顾名思义是代表是否是并发,那么默认的就是串行。
    接下来继续看如何根据dqai创建队列的:

    // 构造函数初始化  dqai.dqai_concurrent:是否是并发
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
    		DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
    		(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
    
    • 1
    • 2
    • 3
    • 4

    可以看到通过init方法初始化,第三个参数,如果是并发传入DISPATCH_QUEUE_WIDTH_MAX,如果是串行传入1。

    #define DISPATCH_QUEUE_WIDTH_FULL_BIT		0x0020000000000000ull
    #define DISPATCH_QUEUE_WIDTH_FULL			0x1000ull
    #define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
    #define DISPATCH_QUEUE_WIDTH_MAX  (DISPATCH_QUEUE_WIDTH_FULL - 2)
    #define DISPATCH_QUEUE_USES_REDIRECTION(width) \
    		({ uint16_t _width = (width); \
    		_width > 1 && _width < DISPATCH_QUEUE_WIDTH_POOL; })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    而这里是DISPATCH_QUEUE_WIDTH_MAX的定义,可以计算其结果是14。

    _dispatch_queue_init

    我们再看init函数内部实现:

    static inline dispatch_queue_class_t
    _dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf,
    		uint16_t width, uint64_t initial_state_bits)
    {
    	uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
    	dispatch_queue_t dq = dqu._dq;
    
    	dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
    			DISPATCH_QUEUE_INACTIVE)) == 0);
    
    	if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
    		dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume
    		if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) {
    			dq->do_ref_cnt++; // released when DSF_DELETED is set
    		}
    	}
    
    	dq_state |= initial_state_bits;
    	dq->do_next = DISPATCH_OBJECT_LISTLESS;
    	// ⚠️⚠️
    	dqf |= DQF_WIDTH(width);
    	os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
    	dq->dq_state = dq_state;
    	dq->dq_serialnum =
    			os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
    	return dqu;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    可以得出:
    如果是并发队列dqf |= DQF_WIDTH(DISPATCH_QUEUE_WIDTH_MAX) 如果是串行队列dqf |= DQF_WIDTH(1)。串行队列和并发队列最根本的区别就是DQF_WIDTH不同,串行队列的为1。这个width可以抽象的理解为队列出口的宽度。可以把串行队列想成一个单向单车道,把任务想成一辆辆车子,车子通过的时候必须一辆一辆按顺序通过;而并发队列可以想成单向多车道,有多个出口,车子可以并行通过。
    继续看dq->dq_serialnum = os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);

    // skip zero
    // 1 - main_q
    // 2 - mgr_q
    // 3 - mgr_root_q
    // 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
    // 17 - workloop_fallback_q
    // we use 'xadd' on Intel, so the initial value == next assigned
    #define DISPATCH_QUEUE_SERIAL_NUMBER_INIT 17
    extern unsigned long volatile _dispatch_queue_serial_numbers;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    所以这里的_dispatch_queue_serial_numbers只是代表的是创建的队列的归属(串行还是并发),所以上面的dq->dq_serialnum = 1就是创建的主队列也是串行队列 。
    再回到_dispatch_lane_create_with_target,看到下面有:

    _dispatch_retain(tq);
    dq->do_targetq = tq;
    
    • 1
    • 2

    这个 tq 是在哪赋值的呢?向上找,在省略的部分找到:

    // priority.h
    #define DISPATCH_QOS_UNSPECIFIED        ((dispatch_qos_t)0)
    #define DISPATCH_QOS_DEFAULT            ((dispatch_qos_t)4)
    
    // queue.c
        ...
    	else {
    		if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
    			// Serial queues default to overcommit!
    			// 如果是并发 overcommit = _dispatch_queue_attr_overcommit_disabled
    			// 如果是串行 overcommit = _dispatch_queue_attr_overcommit_enabled
    			overcommit = dqai.dqai_concurrent ?
    					_dispatch_queue_attr_overcommit_disabled :
    					_dispatch_queue_attr_overcommit_enabled;
    		}
    	}
    	if (!tq) {
    		tq = _dispatch_get_root_queue(
    				qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
    				overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq;
    		if (unlikely(!tq)) {
    			DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
    		}
    	}
    	...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    static inline dispatch_queue_global_t
    _dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
    {
    	if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
    		DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
    	}
    	// qos 为 4,4-1= 3
    	// 2*3 + 0或者1 = 6/7
    	// 然后再去数组 _dispatch_root_queues 里取数组的 6 或者 7 的下标指针地址
    	return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    tq 的值是通过 _dispatch_root_queues 数组取出来的,直接到数组里面看就一目了然了。由此可以发现 tq 就是 dq_label 的值,也就是外面队列 target 的值。

    // 串行队列
    _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
    	.dq_label = "com.apple.root.utility-qos.overcommit",
    	.dq_serialnum = 9,
    ),
    // 并发队列(全局和并发是一样的)
    _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
    	.dq_label = "com.apple.root.default-qos",
    	.dq_serialnum = 10,
    ),
    // 主队列
    _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
    		DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
    	.dq_label = "com.apple.root.default-qos.overcommit",
    	.dq_serialnum = 11,
    ),
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    既然串行队列和并发队列的 target 信息是从 _dispatch_root_queues 结构体数组取出来的,那么 _dispatch_root_queues 又是在哪创建的呢?我们来到最先初始化的 libdispatcdispatch_queue_createh_init 里的查找,最终在 _dispatch_introspection_init 里找到一些代码:
    请添加图片描述
    队列是通过 for 循环,调用 _dispatch_trace_queue_create,再取出 _dispatch_root_queues 里的地址指针一个一个创建出来的。

    GCD深入了解

    执行任务的方式

    执行任务的函数分为两种,同步和异步函数:

    • 同步函数:即 dispatch_sync,必须等待当前语句执行完毕,才会执行下一条语
      句,不会开启线程,在当前执行任务。
    • 异步函数,即 dispatch_async。不用等待当前语句执行完毕,就可以执行下一条语句,会开启线程执行任务。

    dispatch_sync 同步源码分析

    找到源码中同步函数的实现:

    void
    dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
    {
    	uintptr_t dc_flags = DC_FLAG_BLOCK;
    	if (unlikely(_dispatch_block_has_private_data(work))) {
    		return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    	}
    	_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    unlikely 的意思是基本上不会走,然后就来到 _dispatch_sync_f 函数,_dispatch_sync_f 的第三个参数是将 block 包装了一下:

    #define _dispatch_Block_invoke(bb) \
    		((dispatch_function_t)((struct Block_layout *)bb)->invoke)
    
    • 1
    • 2

    继续往下看:

    static void
    _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
    		uintptr_t dc_flags)
    {
    	_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    之后就会来到 _dispatch_sync_f_inline 函数,实现如下:

    static inline void
    _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
    		dispatch_function_t func, uintptr_t dc_flags)
    {
    	// 串行就会走这下面
    	if (likely(dq->dq_width == 1)) {
    		return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    	}
    
    	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
    		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    	}
    
    	dispatch_lane_t dl = upcast(dq)._dl;
    	// Global concurrent queues and queues bound to non-dispatch threads
    	// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    	if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
    		return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    	}
    
    	if (unlikely(dq->do_targetq->do_targetq)) {
    		return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    	}
    	_dispatch_introspection_sync_begin(dl);
    	_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
    			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    注意这里,调用了_dispatch_barrier_sync_f这个从名字看,最终调用了栅栏函数。但是为什么要调用栅栏函数呢?我们先继续往里分析:

    static void
    _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
    		dispatch_function_t func, uintptr_t dc_flags)
    {
    	_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    来到 _dispatch_barrier_sync_f_inline,这里的参数func就是我们外面的block任务:

    static inline void
    _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
    		dispatch_function_t func, uintptr_t dc_flags)
    {
    	// 获取线程ID
    	dispatch_tid tid = _dispatch_tid_self();
    
    	if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
    		DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    	}
    
    	dispatch_lane_t dl = upcast(dq)._dl;
    	// 死锁
    	if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
    		return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
    				DC_FLAG_BARRIER | dc_flags);
    	}
    
    	if (unlikely(dl->do_targetq->do_targetq)) {
    		return _dispatch_sync_recurse(dl, ctxt, func,
    				DC_FLAG_BARRIER | dc_flags);
    	}
    	_dispatch_introspection_sync_begin(dl);
    	_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
    			DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
    					dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在这个函数里会先获取线程 id,因为队列需要绑定到线程然后依赖执行,而死锁的原因在于同步线程里的任务出现你等我,我等你的现象,所以只有 _dispatch_queue_try_acquire_barrier_sync 用到了线程 id

    static inline bool
    _dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
    {
    	return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    static inline bool
    _dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
    		uint32_t tid, uint64_t suspend_count)
    {
    	uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
    	uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
    			_dispatch_lock_value_from_tid(tid) |
    			DISPATCH_QUEUE_UNCONTENDED_SYNC |
    			(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
    	uint64_t old_state, new_state;
    	// 从 os 底层获取信息,也就是通过线程和当前队列获取 new_state 返回出去
    	return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
    		uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
    		if (old_state != (init | role)) {
    			os_atomic_rmw_loop_give_up(break);
    		}
    		new_state = value | role;
    	});
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    从 os 底层获取到了一个 new_state 之后,就会继续执行 _dispatch_sync_f_slow

    static void
    _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
    		dispatch_function_t func, uintptr_t top_dc_flags,
    		dispatch_queue_class_t dqu, uintptr_t dc_flags)
    {
    	dispatch_queue_t top_dq = top_dqu._dq;
    	dispatch_queue_t dq = dqu._dq;
    	if (unlikely(!dq->do_targetq)) {
    		return _dispatch_sync_function_invoke(dq, ctxt, func);
    	}
    
    	pthread_priority_t pp = _dispatch_get_priority();
    	// 初始化保存 block 以及其他信息的结构体
    	struct dispatch_sync_context_s dsc = {
    		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
    		.dc_func     = _dispatch_async_and_wait_invoke,
    		.dc_ctxt     = &dsc,
    		.dc_other    = top_dq,
    		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
    		.dc_voucher  = _voucher_get(),
    		.dsc_func    = func,
    		.dsc_ctxt    = ctxt,
    		.dsc_waiter  = _dispatch_tid_self(),
    	};
    	// 将 block push 到 queue 里面去
    	_dispatch_trace_item_push(top_dq, &dsc);
    	// 死锁的最终函数
    	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
    
    	if (dsc.dsc_func == NULL) {
    		// dsc_func being cleared means that the block ran on another thread ie.
    		// case (2) as listed in _dispatch_async_and_wait_f_slow.
    		dispatch_queue_t stop_dq = dsc.dc_other;
    		return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    	}
    
    	_dispatch_introspection_sync_begin(top_dq);
    	_dispatch_trace_item_pop(top_dq, &dsc);
    	_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
    			DISPATCH_TRACE_ARG(&dsc));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    通过 _dispatch_trace_item_push 函数可以发现队列其实就是一个用来提交 block 的对象,当 block push 到队列中后,将按照 先入先出(FIFO) 的顺序进行处理,系统在 GCD 的底层会维护一个线程池,用来执行这些 block。关于死锁,我们一会儿再具体分析。
    block的大致调用流程:

    dispatch_sync
    _dispatch_sync_f
    _dispatch_sync_invoke_and_complete
    _dispatch_sync_function_invoke_inline
    _dispatch_client_callout
    f(ctxt);

    _dispatch_barrier_sync_f_inlinefunc直接传入到_dispatch_lane_barrier_sync_invoke_and_complete方法内部,接下来看一下:

    static void
    _dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
    		void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
    {
    	_dispatch_sync_function_invoke_inline(dq, ctxt, func);
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    static inline void
    _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
    		dispatch_function_t func)
    {
    	dispatch_thread_frame_s dtf;
    	_dispatch_thread_frame_push(&dtf, dq);
    	// ⚠️⚠️使用了func
    	_dispatch_client_callout(ctxt, func);
    	_dispatch_perfmon_workitem_inc();
    	_dispatch_thread_frame_pop(&dtf);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    void
    _dispatch_client_callout(void *ctxt, dispatch_function_t f)
    {
    	_dispatch_get_tsd_base();
    	void *u = _dispatch_get_unwind_tsd();
    	if (likely(!u)) return f(ctxt);
    	_dispatch_set_unwind_tsd(NULL);
    	// ⚠️⚠️
    	f(ctxt);
    	_dispatch_free_unwind_tsd();
    	_dispatch_set_unwind_tsd(u);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    终于来到了最我们block的执行的地方,这里可以看到block是直接执行了,所以遇到同步函数,我们可以粗暴的理解为,里面的任务马上就要执行。看完了block的执行步骤,我们接下来看一下死锁的具体函数。

    死锁

    static void
    __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
    {
    	uint64_t dq_state = _dispatch_wait_prepare(dq);
    	if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
    		DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
    				"dispatch_sync called on queue "
    				"already owned by current thread");
    	}
    
    	// Blocks submitted to the main thread MUST run on the main thread, and
    	// dispatch_async_and_wait also executes on the remote context rather than
    	// the current thread.
    	//
    	// For both these cases we need to save the frame linkage for the sake of
    	// _dispatch_async_and_wait_invoke
    	
    	// 提交到主线程的块必须在主线程上运行,并且
    	// dispatch_async_and_wait也在远程上下文而不是当前线程上执行。
    	//
    	// 对于这两种情况,我们需要保存框架链接,以便_dispatch_async_and_wait_invoke
    	_dispatch_thread_frame_save_state(&dsc->dsc_dtf);
    
    	...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    可以看到如果满足_dq_state_drain_locked_by的条件就会触发crash。再看条件中函数的实现:

    static inline bool
    _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
    {
    	return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    static inline bool
    _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
    {
    	// equivalent to _dispatch_lock_owner(lock_value) == tid
    	// lock_value 为队列状态,tid 为线程 id
    	// ^ (异或运算法) 两个相同就会出现 0 否则为1
    	return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    就是相当于当dispatch_sync + 串行队列的时候,这个串行队列就会对应一个线程,如果添加任务的代码执行的线程,和串行队列所对应的线程是一个线程的时候,就会发生死锁,从而crash。

    栅栏函数

    现在解决前面的问题,为什么会调用barrier函数?
    如果是并发队列,岂不是会执行完之前的任务,才会执行当前任务么?
    请添加图片描述
    请添加图片描述
    发现并没有走barrier函数。再次回到barrier调用的源码:

    if (likely(dq->dq_width == 1)) {
    	return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }
    
    • 1
    • 2
    • 3

    可以看到barrier调用的条件是dq_width == 1,上文我们也了解到,只有串行队列的dq_width才为1,故如果是串行队列走上面的分支,如果是并发队列走下面的分支,最终调用:

    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
    			_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
    
    • 1
    • 2

    可以理解为在串行队列中,同步函数相同于栅栏函数,会等待队列中之前的任务完成之后再执行当前任务,就是只有一个队伍,不可以插队,先来后到。并行队列中,允许排几个队伍,大家各个队伍互不干扰,并不会等之前的任务执行完成,再执行这个同步任务,而是优先执行同步任务。

    dispatch_async 异步源码分析

    异步函数会需要开启线程去执行任务,所以这应该会是一个重点。先看实现:

    void
    dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
    {
    	dispatch_continuation_t dc = _dispatch_continuation_alloc();
    	uintptr_t dc_flags = DC_FLAG_CONSUME;
    	dispatch_qos_t qos;
    
    	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    异步函数会通过 _dispatch_continuation_init 先对 block 进行包装即函数式保存,看一下代码:

    static inline dispatch_qos_t
    _dispatch_continuation_init(dispatch_continuation_t dc,
    		dispatch_queue_class_t dqu, dispatch_block_t work,
    		dispatch_block_flags_t flags, uintptr_t dc_flags)
    {
    	void *ctxt = _dispatch_Block_copy(work);
    
    	dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    	if (unlikely(_dispatch_block_has_private_data(work))) {
    		dc->dc_flags = dc_flags;
    		dc->dc_ctxt = ctxt;
    		// will initialize all fields but requires dc_flags & dc_ctxt to be set
    		return _dispatch_continuation_init_slow(dc, dqu, flags);
    	}
    
    	dispatch_function_t func = _dispatch_Block_invoke(work);
    	if (dc_flags & DC_FLAG_CONSUME) {
    		func = _dispatch_call_block_and_release;
    	}
    	return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    block封装成了dispatch_function_t类型的func,之后又进一步进行了处理。接下来继续看有关work的处理。

    static inline void
    _dispatch_continuation_async(dispatch_queue_class_t dqu,
    		dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
    {
    #if DISPATCH_INTROSPECTION
    	if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
    		_dispatch_trace_item_push(dqu, dc);
    	}
    #else
    	(void)dc_flags;
    #endif
    	return dx_push(dqu._dq, dc, qos);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    注意这里的dx_push,在宏定义中找到它的定义:

    #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
    
    • 1

    请添加图片描述
    这里是根据不同种类的队列,执行不同的函数。通过观察不难发现,dx_push(dqu._dq, dc, qos);里的dqu其实就是我们传进来的队列。
    我们通过打断点的方式查看block的调用流程:
    请添加图片描述
    调用了_dispatch_worker_thread2函数,该函数只在_dispatch_root_queues_init_once里面被调用过,而它也只在_dispatch_root_queues_init函数里面被调用。继续寻找,_dispatch_root_queues_init_dispatch_root_queue_poke_slow函数里被调用。这个方法是一个重点。_dispatch_root_queue_poke_slow函数是_dispatch_root_queue_poke的返回值,而_dispatch_root_queue_poke_dispatch_root_queue_push_inline里面有机会被调用,这个inline函数明显由_dispatch_root_queue_push调用。我也看了很多别人的博客,大概对这个过程有了一点理解,接下来使用并发队列举例。

    Block的调用流程

    并发队列_dispatch_lane_concurrent_push

    void
    _dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
    		dispatch_qos_t qos)
    {
    	if (dq->dq_items_tail == NULL &&
    			!_dispatch_object_is_waiter(dou) &&
    			!_dispatch_object_is_barrier(dou) &&
    			_dispatch_queue_try_acquire_async(dq)) {
    		// 非栅栏情况走_redirect_push
    		return _dispatch_continuation_redirect_push(dq, dou, qos);
    	}
    	// 其他情况走
    	_dispatch_lane_push(dq, dou, qos);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    非栅栏情况,接着走_dispatch_continuation_redirect_push

    static void
    _dispatch_continuation_redirect_push(dispatch_lane_t dl,
    		dispatch_object_t dou, dispatch_qos_t qos)
    {
    	if (likely(!_dispatch_object_is_redirection(dou))) {
    		dou._dc = _dispatch_async_redirect_wrap(dl, dou);
    	} else if (!dou._dc->dc_ctxt) {
    		// find first queue in descending target queue order that has
    		// an autorelease frequency set, and use that as the frequency for
    		// this continuation.
    		dou._dc->dc_ctxt = (void *)
    		(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
    	}
    	dispatch_queue_t dq = dl->do_targetq;
    	if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
    	dx_push(dq, dou, qos);// 又做了一遍dx_push,此时的入参dq === do_do_targetq
    	// 原因在于GCD也是对象,也存在继承封装的问题,类似于 类 父类 根类的关系。
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    这里会发现又走到了dx_push,即递归了,综合前面队列创建时可知,队列也是一个对象,有父类、根类,所以会递归执行到根类的方法。
    do_targetq是什么呢?得回到队列的创建dispatch_queue_create去查看:
    请添加图片描述
    请添加图片描述
    tqroot_queue
    请添加图片描述
    tq赋值给do_targetq,所以do_targetq就是root_queue
    请添加图片描述
    再看root_queue的类型是dispatch_queue_global_t,所以说_dispatch_continuation_redirect_push里的dx_push时的队列是dispatch_queue_global_t,我们需要去找对应的dq_push 的方法:

    void
    _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
    		dispatch_qos_t qos)
    {
    #if DISPATCH_USE_KEVENT_WORKQUEUE
    	dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
    	if (unlikely(ddi && ddi->ddi_can_stash)) {
    		dispatch_object_t old_dou = ddi->ddi_stashed_dou;
    		dispatch_priority_t rq_overcommit;
    		rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    
    		if (likely(!old_dou._do || rq_overcommit)) {
    			dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
    			dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
    			ddi->ddi_stashed_rq = rq;
    			ddi->ddi_stashed_dou = dou;
    			ddi->ddi_stashed_qos = qos;
    			_dispatch_debug("deferring item %p, rq %p, qos %d",
    					dou._do, rq, qos);
    			if (rq_overcommit) {
    				ddi->ddi_can_stash = false;
    			}
    			if (likely(!old_dou._do)) {
    				return;
    			}
    			// push the previously stashed item
    			qos = old_qos;
    			rq = old_rq;
    			dou = old_dou;
    		}
    	}
    #endif
    #if HAVE_PTHREAD_WORKQUEUE_QOS
    	if (_dispatch_root_queue_push_needs_override(rq, qos)) {
    		return _dispatch_root_queue_push_override(rq, dou, qos);
    	}
    #else
    	(void)qos;
    #endif
    	_dispatch_root_queue_push_inline(rq, dou, dou, 1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    进去_dispatch_root_queue_push_inline

    static inline void
    _dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
    		dispatch_object_t _head, dispatch_object_t _tail, int n)
    {
    	struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
    	if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
    		return _dispatch_root_queue_poke(dq, n, 0);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    进到_dispatch_root_queue_poke

    void
    _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
    {
    	if (!_dispatch_queue_class_probe(dq)) {
    		return;
    	}
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
    	if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
    #endif
    	{
    		if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
    			_dispatch_root_queue_debug("worker thread request still pending "
    					"for global queue: %p", dq);
    			return;
    		}
    	}
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    	return _dispatch_root_queue_poke_slow(dq, n, floor);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    接下来就到了我们前边提到过的_dispatch_root_queue_poke_slow函数。

    _dispatch_root_queue_poke_slow

    static void
    _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
    {
    	int remaining = n;
    #if !defined(_WIN32)
    	int r = ENOSYS;
    #endif
    
    	_dispatch_root_queues_init();
    	_dispatch_debug_root_queue(dq, __func__);
    	_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
    
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
    	if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
    	// 如果是全局队列,那么就会创建线程去处理。
    #endif
    	{
    		_dispatch_root_queue_debug("requesting new worker thread for global "
    				"queue: %p", dq);
    		r = _pthread_workqueue_addthreads(remaining,
    				_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
    		(void)dispatch_assume_zero(r);
    		return;
    	}
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
    	dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
    	if (likely(pqc->dpq_thread_mediator.do_vtable)) {
    		while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
    			_dispatch_root_queue_debug("signaled sleeping worker for "
    					"global queue: %p", dq);
    			if (!--remaining) {
    				return;
    			}
    		}
    	}
    
    	bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    	if (overcommit) {
    		// 串行队列
    		os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
    	} else {
    		if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
    			_dispatch_root_queue_debug("worker thread request still pending for "
    					"global queue: %p", dq);
    			return;
    		}
    	}
    	// floor 为 0,remaining 是根据队列任务的情况处理的
    	int can_request, t_count;
    	// seq_cst with atomic store to tail 
    	// 获取线程池的大小
    	t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    	// 如果是普通的自己创建的线程,就会进行dowhile循环。
    	// 这里dgq_thread_pool_size会暂时标记为1,这是因为正常的并行队列是0的,而全局队列为1的是因为他的线程量比正常的并行队列多一个。
    	/*
    	static const struct dispatch_queue_global_s _dispatch_custom_workloop_root_queue = {
    		DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
    		.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
    		.do_ctxt = NULL,
    		.dq_label = "com.apple.root.workloop-custom",
    		.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
    		.dq_priority = _dispatch_priority_make_fallback(DISPATCH_QOS_DEFAULT) |
    			DISPATCH_PRIORITY_SATURATED_OVERRIDE,
    		.dq_serialnum = DISPATCH_QUEUE_SERIAL_NUMBER_WLF,
    		.dgq_thread_pool_size = 1,
    	};
    	*/
    	do {
    		// 计算可以请求的数量
    		// t_count是通过os_atomic_load2o得来的
    		// floor是之前传过来的参数
    		can_request = t_count < floor ? 0 : t_count - floor;
    		if (remaining > can_request) {
    			_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
    					remaining, can_request);
    			os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
    			remaining = can_request;
    		}
    		// remaining 一般不会大于 can_request,否则就会报异常。
    		// remaining是需要的线程数,而can_request是可以请求的线程数。这里如果大于,就会进行--的操作,如果remaining为0,那么就代表着线程池已经满了,那么就会直接return
    		if (remaining == 0) {
    			// 线程池满了,就会报出异常的情况
    			_dispatch_root_queue_debug("pthread pool is full for root queue: "
    					"%p", dq);
    			return;
    		}
    	} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
    			t_count - remaining, &t_count, acquire));
    
    #if !defined(_WIN32)
    	pthread_attr_t *attr = &pqc->dpq_thread_attr;
    	pthread_t tid, *pthr = &tid;
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    	if (unlikely(dq == &_dispatch_mgr_root_queue)) {
    		pthr = _dispatch_mgr_root_queue_init();
    	}
    #endif
    	do {
    		_dispatch_retain(dq); // released in _dispatch_worker_thread
    		// 开辟线程
    		while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
    			if (r != EAGAIN) {
    				(void)dispatch_assume_zero(r);
    			}
    			_dispatch_temporary_resource_shortage();
    		}
    	} while (--remaining);
    #else // defined(_WIN32)
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    	if (unlikely(dq == &_dispatch_mgr_root_queue)) {
    		_dispatch_mgr_root_queue_init();
    	}
    #endif
    	do {
    		_dispatch_retain(dq); // released in _dispatch_worker_thread
    #if DISPATCH_DEBUG
    		unsigned dwStackSize = 0;
    #else
    		unsigned dwStackSize = 64 * 1024;
    #endif
    		uintptr_t hThread = 0;
    		while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
    			if (errno != EAGAIN) {
    				(void)dispatch_assume(hThread);
    			}
    			_dispatch_temporary_resource_shortage();
    		}
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
    		if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
    			(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
    		}
    #endif
    		CloseHandle((HANDLE)hThread);
    	} while (--remaining);
    #endif // defined(_WIN32)
    #else
    	(void)floor;
    #endif // DISPATCH_USE_PTHREAD_POOL
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141

    里面先调用了_dispatch_root_queues_init(),看一下:

    static inline void
    _dispatch_root_queues_init(void)
    {
    	dispatch_once_f(&_dispatch_root_queues_pred, NULL,
    			_dispatch_root_queues_init_once);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里进行了单例的调用,调用了_dispatch_root_queues_init_once,看一下,里面可以看到这:
    请添加图片描述
    上文提到过的_dispatch_worker_thread2在这里封装给了pthreadapi。GCD也是对pthread的封装。这里的调用执行,是通过workloop调用的,而不是立即执行,是受cpu进行调控处理的。

    semaphore 信号量源码分析

    信号量的API不多,dispatch_semaphore_t的常用方法有三个:

    • dispatch_semaphore_create 创建信号量
    • dispatch_semaphore_wait 等待信号量
    • dispatch_semaphore_signal 释放信号量

    dispatch_semaphore_create

    dispatch_semaphore_t
    dispatch_semaphore_create(intptr_t value)
    {
    	dispatch_semaphore_t dsema;
    
    	// If the internal value is negative, then the absolute of the value is
    	// equal to the number of waiting threads. Therefore it is bogus to
    	// initialize the semaphore with a negative value.
    	// 初始值必须大于等于0
    	if (value < 0) {
    		return DISPATCH_BAD_INPUT;
    	}
    	//开辟内存
    	dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
    			sizeof(struct dispatch_semaphore_s));
    	dsema->do_next = DISPATCH_OBJECT_LISTLESS;
    	dsema->do_targetq = _dispatch_get_default_queue(false);
    	//保存初始值
    	dsema->dsema_value = value;
    	//初始化方法
    	_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    	dsema->dsema_orig = value;
    	return dsema;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    如果信号为小于0,则返回一个DISPATCH_BAD_INPUT类型对象,也就是返回个_Nonnull(#define DISPATCH_BAD_INPUT ((void *_Nonnull)0)
    如果信号大于等于0,就会dispatch_semaphore_t对象dsema进行初始化,并返回dsema对象。

    dispatch_semaphore_wait

    intptr_t
    dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
    {
    	long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    	if (likely(value >= 0)) {
    		return 0;
    	}
    	return _dispatch_semaphore_wait_slow(dsema, timeout);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    dispatch_atomic_dec2o 是一个宏,会调用 GCC 内置的函数 __sync_sub_and_fetch,实现减法的原子性操作。因此意思是将 dsema 的值减一,并把新的值赋给 value。如果减一后的 value 大于等于 0 就立刻返回,没有任何操作,否则调用_dispatch_semaphore_wait_slow_dispatch_semaphore_wait_slow 函数针对不同的 timeout 参数,分了三种情况考虑。

    static intptr_t
    _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
    		dispatch_time_t timeout)
    {
    	long orig;
    
    	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    	switch (timeout) {
    	default:
    		if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
    			break;
    		}
    		// Fall through and try to undo what the fast path did to
    		// dsema->dsema_value
    	case DISPATCH_TIME_NOW:
    		orig = dsema->dsema_value;
    		while (orig < 0) {
    			if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
    					&orig, relaxed)) {
    				return _DSEMA4_TIMEOUT();
    			}
    		}
    		// Another thread called semaphore_signal().
    		// Fall through and drain the wakeup.
    	case DISPATCH_TIME_FOREVER:
    		_dispatch_sema4_wait(&dsema->dsema_sema);
    		break;
    	}
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    第一种情况 while 判断一定会成立,因为如果 value 大于等于 0,在上一个函数 dispatch_semaphore_wait 中就已经返回了,判断成立,内部的 if 判断一定也成立,此时会将 value 加一(也就是变为 0)并返回。加一的原因是为了抵消 wait 函数一开始的减一操作。此时函数调用方会得到返回值 KERN_OPERATION_TIMED_OUT,表示由于等待时间超时而返回。
    第二种情况是 DISPATCH_TIME_FOREVER,会调用系统的 semaphore_wait 方法继续等待,直到收到 signal 调用。
    第三种就是一个默认的情况,在default 分支下,我们指定一个超时时间,在 _dispatch_sema4_timedwait 里面会去判断当前操作是否超时,这和 DISPATCH_TIME_FOREVER 的处理比较类似,不同的是我们调用了内核提供的 semaphore_timedwait 方法可以指定超时时间。
    接下来看一下常用的 DISPATCH_TIME_FOREVER情况下调用的_dispatch_sema4_wait方法:

    void
    _dispatch_sema4_wait(_dispatch_sema4_t *sema)
    {
    	kern_return_t kr;
    	do {
    		kr = semaphore_wait(*sema);
    	} while (kr == KERN_ABORTED);
    	DISPATCH_SEMAPHORE_VERIFY_KR(kr);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里进行了do-while循环,这个循环就相当于忙等。

    dispatch_semaphore_signal

    intptr_t
    dispatch_semaphore_signal(dispatch_semaphore_t dsema)
    {
    	long value = os_atomic_inc2o(dsema, dsema_value, release);
    	if (likely(value > 0)) {
    		return 0;
    	}
    	if (unlikely(value == LONG_MIN)) {
    		DISPATCH_CLIENT_CRASH(value,
    				"Unbalanced call to dispatch_semaphore_signal()");
    	}
    	return _dispatch_semaphore_signal_slow(dsema);
    }
    DISPATCH_NOINLINE
    intptr_t
    _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
    {
    	_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    	_dispatch_sema4_signal(&dsema->dsema_sema, 1);
    	return 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 首先信号量的值+1
    • 如果加完,值大于0,直接返回。
    • 否则执行 _dispatch_semaphore_signal_slow

    dispatch_once一次性代码

    其实,这个一次性代码就是我们经常使用的dispatch_once方法。我们经常在只需要执行一次的代码上使用它(比如说单例)。

    void
    dispatch_once(dispatch_once_t *val, dispatch_block_t block)
    {
    	dispatch_once_f(val, block, _dispatch_Block_invoke(block));
    }
    void
    dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
    {
    	dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    	uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    	// 如果标识符是DLOCK_ONCE_DONE,代表已经执行过,那么就直接return
    	if (likely(v == DLOCK_ONCE_DONE)) {
    		return;
    	}
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    	//已经完成分支
    	if (likely(DISPATCH_ONCE_IS_GEN(v))) {
    		return _dispatch_once_mark_done_if_quiesced(l, v);
    	}
    #endif
    #endif
    	//未执行过分支
    	if (_dispatch_once_gate_tryenter(l)) {
    		return _dispatch_once_callout(l, ctxt, func);
    	}
    	//正在别的线程执行分支
    	return _dispatch_once_wait(l);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    先看一看宏定义:

    #if defined(__x86_64__) || defined(__i386__) || defined(__s390x__)
    #define DISPATCH_ONCE_USE_QUIESCENT_COUNTER 0
    #elif __APPLE__
    #define DISPATCH_ONCE_USE_QUIESCENT_COUNTER 1
    #else
    #define DISPATCH_ONCE_USE_QUIESCENT_COUNTER 0
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    #define DISPATCH_ONCE_MAKE_GEN(gen)  (((gen) << 2) + DLOCK_FAILED_TRYLOCK_BIT)
    #define DISPATCH_ONCE_IS_GEN(gen)    (((gen) & 3) == DLOCK_FAILED_TRYLOCK_BIT)
    
    • 1
    • 2
    • 3

    看设置已经完成的宏,是在callout流程里面调用的左移2位,然后 + 0x10(DLOCK_FAILED_TRYLOCK_BIT),如果执行过次操作,下面的判断一定是YES,下面的判断是与0x11,即获取低2位值,看是否等于0x10
    第一次进来的话,会走到_dispatch_once_gate_tryenter这里,而里面做了解锁的操作,是对多线程的封装处理,所以是线程安全的。最后,就调用了_dispatch_once_callout。如果加锁了,那么就会调用_dispatch_once_wait进行无限制等待开锁的状态。

    static inline bool
    _dispatch_once_gate_tryenter(dispatch_once_gate_t l)
    {
    	return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
    			(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里主要是通过底层os_atomic_cmpxchg方法进行对比,如果比较没有问题,则进行加锁,即任务的标识符置为DLOCK_ONCE_UNLOCKED。接下来看一下_dispatch_once_callout

    static void
    _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
    		dispatch_function_t func)
    {
    	//执行block
    	_dispatch_client_callout(ctxt, func);
    	//设置执行状态
    	_dispatch_once_gate_broadcast(l);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    _dispatch_once_callout调用_dispatch_client_callout执行了block,调用_dispatch_once_gate_broadcast进行标记符的处理。

    #undef _dispatch_client_callout
    DISPATCH_NOINLINE
    void
    _dispatch_client_callout(void *ctxt, dispatch_function_t f)
    {
    	_dispatch_get_tsd_base();
    	void *u = _dispatch_get_unwind_tsd();
    	if (likely(!u)) return f(ctxt);
    	_dispatch_set_unwind_tsd(NULL);
    	f(ctxt);
    	_dispatch_free_unwind_tsd();
    	_dispatch_set_unwind_tsd(u);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    接下来再看一下_dispatch_once_gate_broadcast的实现:

    static inline void
    _dispatch_once_gate_broadcast(dispatch_once_gate_t l)
    {
    	dispatch_lock value_self = _dispatch_lock_value_for_self();
    	uintptr_t v;
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    	//获取已经完成的值
    	v = _dispatch_once_mark_quiescing(l);
    #else
    	v = _dispatch_once_mark_done(l);
    #endif
    	if (likely((dispatch_lock)v == value_self)) return;
    	_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    static inline uintptr_t
    _dispatch_once_mark_quiescing(dispatch_once_gate_t dgo)
    {
    	//线程安全
    	return os_atomic_xchg(&dgo->dgo_once, _dispatch_once_generation(), release);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    DISPATCH_ONCE_MAKE_GEN便是设置已经完成的值。

    group 源码分析

    接下来浅浅了解下与group相关的源码。

    dispatch_group_create

    先看看如何创建 group

    dispatch_group_t
    dispatch_group_create(void)
    {
    	return _dispatch_group_create_with_count(0);
    }
    static inline dispatch_group_t
    _dispatch_group_create_with_count(uint32_t n)
    {
    	dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
    			sizeof(struct dispatch_group_s));
    	dg->do_next = DISPATCH_OBJECT_LISTLESS;
    	dg->do_targetq = _dispatch_get_default_queue(false);
    	if (n) {
    		os_atomic_store2o(dg, dg_bits,
    				(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
    		os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // 
    	}
    	return dg;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这个方法就是开辟了内存空间,但是从 os_atomic_store2o 可以看出 group 底层也维护了一个 value 值。

    dispatch_group_enter

    从 enter 可以看出,当进组的时候会通过 os_atomic_sub_orig2ovalue 减 4。

    void
    dispatch_group_enter(dispatch_group_t dg)
    {
    	// The value is decremented on a 32bits wide atomic so that the carry
    	// for the 0 -> -1 transition is not propagated to the upper 32bits.
    	uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
    			DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    	uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    	if (unlikely(old_value == 0)) {
    		_dispatch_retain(dg); // 
    	}
    	if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
    		DISPATCH_CLIENT_CRASH(old_bits,
    				"Too many nested calls to dispatch_group_enter()");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    dispatch_group_leave

    出组的时候会对 value 进行加值,如果 new_stateold_state 相等,就会调用 _dispatch_group_wake 继续后面代码的执行

    void
    dispatch_group_leave(dispatch_group_t dg)
    {
    	// The value is incremented on a 64bits wide atomic so that the carry for
    	// the -1 -> 0 transition increments the generation atomically.
    	uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
    			DISPATCH_GROUP_VALUE_INTERVAL, release);
    	uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
    
    	if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
    		old_state += DISPATCH_GROUP_VALUE_INTERVAL;
    		do {
    			new_state = old_state;
    			if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    				new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
    				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
    			} else {
    				// If the group was entered again since the atomic_add above,
    				// we can't clear the waiters bit anymore as we don't know for
    				// which generation the waiters are for
    				new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
    			}
    			if (old_state == new_state) break;
    		} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
    				old_state, new_state, &old_state, relaxed)));
    		return _dispatch_group_wake(dg, old_state, true);
    	}
    
    	if (unlikely(old_value == 0)) {
    		DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
    				"Unbalanced call to dispatch_group_leave()");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    dispatch_group_async

    dispatch_group_async 就是对 enterleave 的封装,当 block 调用完成之后进行 callout 之后就出组了。

    void
    dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
    		dispatch_block_t db)
    {
    	dispatch_continuation_t dc = _dispatch_continuation_alloc();
    	uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    	dispatch_qos_t qos;
    	// 保存任务 
    	qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    	_dispatch_continuation_group_async(dg, dq, dc, qos);
    }
    
    static inline void
    _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
    		dispatch_continuation_t dc, dispatch_qos_t qos)
    {
    	// 进组
    	dispatch_group_enter(dg);
    	dc->dc_data = dg;
    	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    static inline void
    _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
    {
        struct dispatch_object_s *dou = dc->dc_data;
        unsigned long type = dx_type(dou);
        if (type == DISPATCH_GROUP_TYPE) {
        	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
        	_dispatch_trace_item_complete(dc);
        	// 出组
        	dispatch_group_leave((dispatch_group_t)dou);
        } else {
        	DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    dispatch_group_wait

    这个方法用于等待 group 中所有任务执行完成,可以理解为信号量 wait 的封装

    intptr_t
    dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
    {
    	uint64_t old_state, new_state;
    
    	os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
    		if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    			os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
    		}
    		if (unlikely(timeout == 0)) {
    			os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
    		}
    		new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
    		if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
    			os_atomic_rmw_loop_give_up(break);
    		}
    	});
    
    	return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    如果当前 value 和原始 value 相同,表明任务已经全部完成,直接返回 0,如果 timeout 为 0 也会立刻返回,否则调用 _dispatch_group_wait_slow
    _dispatch_group_wait_slow 会一直等到任务完成返回 0 ,当然如果一直没有完成就会返回 timeout

    _dispatch_group_wake

    这个函数主要做的就是循环调用 dispatch_async_f 异步执行在 notify 函数中注册的回调。

    static void
    _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
    {
    	uint16_t refs = needs_release ? 1 : 0; // 
    	if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
    		dispatch_continuation_t dc, next_dc, tail;
    		// Snapshot before anything is notified/woken 
    		dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
    		do {
    			dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
    			next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
    			_dispatch_continuation_async(dsn_queue, dc,
    					_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
    			_dispatch_release(dsn_queue);
    		} while ((dc = next_dc));
    		refs++;
    	}
    	if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
    		_dispatch_wake_by_address(&dg->dg_gen);
    	}
    	if (refs) _dispatch_release_n(dg, refs);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    Linux ssh协议
    笔者近期感想
    小李学知识之redis相关(redis面试题)
    【论文阅读笔记】A review of the deep learning methods for medical images super resolut
    【开发流程】持续集成、持续交付、持续部署
    Linux命令学习—Apache 服务器(下)
    【机器学习合集】深度学习模型优化方法&最优化问题合集 ->(个人学习记录笔记)
    【2023年11月第四版教材】软考高项极限冲刺篇笔记(3)
    操作系统中的(进程,线程)
    nodejs中如何使用Redis
  • 原文地址:https://blog.csdn.net/weixin_52192405/article/details/125471159