Swift教程

iOS GCD源码浅析

本文主要是介绍iOS GCD源码浅析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、GCD简介

什么是 GCDGCD(Grand Central Dispatch) 是异步执行任务的技术之一。我们只需要将定义的任务追加到适当的 Dispatch Queue 中,GCD 就能帮我们生成必要的线程并执行我们的任务而且不需要编写任何线程管理代码。因此使用 GCDvery easy 的,首先创建队列,然后将任务添加到队列,最后指定执行任务函数,搞定收工。

// 创建同步队列
dispatch_queue_t queue = dispatch_queue_create("cc", DISPATCH_QUEUE_SERIAL);
// 添加任务
dispatch_block_t block = ^{
    NSLog(@"hello word");
};
// 指定执行任务函数,依赖队列执行
dispatch_async(queue, block);
复制代码

但是 GCD 又是否真的如此简单呢?有兴趣的话就一起去 GCD 的源码看看呗,看看又不花钱。

二、开启 GCD 的大门

首先,创建两个常见的队列,串行队列和并发队列,然后分别 po 一下里面都有什么东东,再顺便把主队列和全局队列分别一起给看了,先对队列里面的东西有一丢丢的印象就可以了。

貌似发现点了什么,主队列的 width 和串行队列的 width 是一样的,其他的也有很多相同的,难怪说主队列是特殊的串行队列。但是并发队列和全局队列的 width 却相差 1,有点奇怪。接下来我们就到源码中一探究竟吧。在此附上 libdispatch源码链接,希望大家能一起开开心心看源码,本文使用的是 libdispatch-1008.220.2 版本。

1. dispatch_queue_create 源码分析

来到源码之后,当然是全局搜索快速定位 dispatch_queue_create 位置,看它到底在里面做了什么坏事,然后发现队列是通过 _dispatch_lane_create_with_target 创建的,常规接口隔离操作。_dispatch_lane_create_with_target 第一句代码就是将我们的外界传进来区分串行还是并发的参数传进去创建了一个 dispatch_queue_attr_info_t 类型的结构体,有鬼。

dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);
复制代码

一查 dispatch_queue_attr_info_t 是一个结构体位域,结构体位域可以通过一些位运算取出我们想要的内容,过滤掉我们不想要的数据。

typedef struct dispatch_queue_attr_info_s {
	dispatch_qos_t dqai_qos : 8;
	int      dqai_relpri : 8;
	uint16_t dqai_overcommit:2;
	uint16_t dqai_autorelease_frequency:2;
	uint16_t dqai_concurrent:1;
	uint16_t dqai_inactive:1;
} dispatch_queue_attr_info_t;
复制代码

接下来我们来看一下是怎么创建这个结构体的吧。

dispatch_queue_attr_info_t
_dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa)
{
    dispatch_queue_attr_info_t dqai = { };
    // 串行队列直接返回空的 dqai 结构体
    if (!dqa) return dqai;
    
    #if DISPATCH_VARIANT_STATIC
    if (dqa == &_dispatch_queue_attr_concurrent) {
    	dqai.dqai_concurrent = true;
    	return dqai;
    }
    #endif
    if (dqa < _dispatch_queue_attrs ||
    		dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) {
    	DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
    }
    
    // 苹果的算法
    size_t idx = (size_t)(dqa - _dispatch_queue_attrs);
    
    // 并发队列结构体位域的默认配置和赋值
    dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT;
    dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT;
    dqai.dqai_relpri = -(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT);
    idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT;
    dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT;
    dqai.dqai_autorelease_frequency =
    		idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT;
    dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT;
    return dqai;
}
复制代码

从上面可以发现串行队列就是一个空的结构体,并发队列会对结构体进行一系列的默认配置和赋值,重点关注 dqai.dqai_concurrent 等一些 属性都是有值的,这也会是我们下面的源码的一个分水岭。接着继续往下分析,在忽略了一些代码之后,终于发现了重点所在 vtable,这个应该一个表,然后我们来看看它都做了些什么。

const void *vtable;
if (dqai.dqai_concurrent) {
    // 通过 dqai.dqai_concurrent 来区分并发和串行
    vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
    vtable = DISPATCH_VTABLE(queue_serial);
}
复制代码

创建了 vtable 之后再通过 vtable 开辟内存,生成响应的对象 queue

dispatch_lane_t dq = _dispatch_object_alloc(vtable,
		sizeof(struct dispatch_lane_s));

_dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
		DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
		(dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
复制代码

这个 init 构造函数的第三个参数是个三目运算符,如果是并发队列是一个宏定义,串行队列是 1,然后再搜搜这个宏定义发现居然是 #define DISPATCH_QUEUE_WIDTH_MAX (0x1000ull - 2),妈耶,这不就是上面我们打印串行队列和并发队列的 width 的值嘛!原来就是在这初始化的。那为啥是减 2 而不是减 1 呢?减 1 就是我们的全局并发队列!

// 验证
#define DISPATCH_QUEUE_WIDTH_POOL (0x1000ull - 1)
struct dispatch_queue_global_s _dispatch_mgr_root_queue = {
	DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
	.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
	.do_ctxt = &_dispatch_mgr_root_queue_pthread_context,
	.dq_label = "com.apple.root.libdispatch-manager",
	.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
	.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
			DISPATCH_PRIORITY_SATURATED_OVERRIDE,
	.dq_serialnum = 3,
	.dgq_thread_pool_size = 1,
};
复制代码

好,接着往下看,又找到了 dq->do_targetq = tq;,这个 tq 又是在哪赋值的呢?我在上面找到了赋值代码。

#define DISPATCH_QOS_UNSPECIFIED        ((dispatch_qos_t)0)
#define DISPATCH_QOS_DEFAULT            ((dispatch_qos_t)4)
// Serial queues default to overcommit!
// 如果是并发 overcommit = _dispatch_queue_attr_overcommit_disabled
// 如果是串行 overcommit = _dispatch_queue_attr_overcommit_enabled
overcommit = dqai.dqai_concurrent ?
		_dispatch_queue_attr_overcommit_disabled :
		_dispatch_queue_attr_overcommit_enabled;

if (!tq) {
    tq = _dispatch_get_root_queue(
	qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, // 4
	overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq; // 0 1
}

static inline dispatch_queue_global_t
_dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit)
{
	if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) {
		DISPATCH_CLIENT_CRASH(qos, "Corrupted priority");
	}
	// qos 为 4,4-1= 3
	// 2*3 + 0或者1 = 6/7
	// 然后再去数组 _dispatch_root_queues 里取数组的 6 或者 7 的下标指针地址
	return &_dispatch_root_queues[2 * (qos - 1) + overcommit];
}
复制代码

由于 tq 的值是通过 _dispatch_root_queues 数组取出来的,直接到数组里面看就一目了然了。我就只列举了串行队列、并发队列(全局和并发是一样的)和主队列的。由此可以发现 tq 就是 dq_label 的值,也就是外面队列 target 的值。

_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
	.dq_label = "com.apple.root.utility-qos.overcommit",
	.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK,
	.dq_label = "com.apple.root.default-qos",
	.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
		DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
	.dq_label = "com.apple.root.default-qos.overcommit",
	.dq_serialnum = 11,
),
复制代码

既然串行队列和并发队列的 target 信息是从 _dispatch_root_queues 结构体数组取出来的,那么 _dispatch_root_queues 又是在哪创建的呢?我们来到最先初始化的 libdispatcdispatch_queue_createh_init 里的查找,最终在 _dispatch_introspection_init 找到了一些代码。

队列是通过 for 循环,调用 _dispatch_trace_queue_create,再取出 _dispatch_root_queues 里的地址指针一个一个创建出来的。

  • 小扩展:在 allocinit 里面的源码我都发现了 dispatch_object_t 这个类型,然后一搜居然是一个联合体,这个就有意思了,通过联合体同一时间只能一个有值的特性给函数带来了多态性。
typedef union {
	struct _os_object_s *_os_obj;
	struct dispatch_object_s *_do;
	struct dispatch_queue_s *_dq;
	struct dispatch_queue_attr_s *_dqa;
	struct dispatch_group_s *_dg;
	struct dispatch_source_s *_ds;
	struct dispatch_mach_s *_dm;
	struct dispatch_mach_msg_s *_dmsg;
	struct dispatch_semaphore_s *_dsema;
	struct dispatch_data_s *_ddata;
	struct dispatch_io_s *_dchannel;
} dispatch_object_t DISPATCH_TRANSPARENT_UNION;
复制代码

到此,GCD 的大门就已经打开,接下来我们来点深入一点的东西,也让大家不虚此行。

三、GCD 深入了解

当我们了解队列的创建之后,添加任务就是一个 blockblock 参数的类型是 dispatch_block_t,它是一个没有参数,没有返回值的 block:typedef void (^dispatch_block_t)(void); 没什么太多的好研究的,接下来就来看看执行任务的函数都在底层做了些什么吧。接下来的内容会比较的干巴巴,但是看的越深,对自己帮助会更大。

1.执行任务的函数

执行任务的函数分为两种,同步和异步函数(即同步线程和异步线程)。

  • 同步函数:即 dispatch_sync,必须等待当前语句执行完毕,才会执行下一条语 句,不会开启线程,在当前执行任务。
  • 异步函数,即 dispatch_async。不用等待当前语句执行完毕,就可以执行下一条语句,会开启线程执行任务。

2. dispatch_sync 源码分析

在分析同步函数源码之前,先提出两个疑问,同步是怎么执行的呢?死锁是在同步的情况下造成的,那底层源码又是怎么样的呢?带着这两个疑问点我们接着分析dispatch_sync 源码。 我们来到 dispatch_sync 函数实现:

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
    	return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
复制代码

dispatch_sync 里的 unlikely 的意思是基本上不会走,然后就来到 _dispatch_sync_f 函数,_dispatch_sync_f 的第三个参数是将 block 包装了一下,具体包装如下:

#define _dispatch_Block_invoke(bb) \
		((dispatch_function_t)((struct Block_layout *)bb)->invoke)
复制代码

省略各种分支之后就会来到 _dispatch_sync_f_inline 函数,实现如下:

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
    // 串行就会走这下面
    if (likely(dq->dq_width == 1)) {
    	return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }
        // ... 下面代码暂时省略
}
复制代码

经常层层难关,发现串行队列好像就是调用了 _dispatch_barrier_sync_f 函数,看到这个函数是不是想起了一些什么,这是不是就是栅栏函数,那分析完同步之后,栅栏也就迎刃而解了,爽歪歪。继续往里分析,最终来到 _dispatch_barrier_sync_f_inline:

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
		dispatch_function_t func, uintptr_t dc_flags)
{
    // 获取线程ID
    dispatch_tid tid = _dispatch_tid_self();
    
    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
    	DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }
    dispatch_lane_t dl = upcast(dq)._dl;
    
    // 死锁
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
    	return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
    			DC_FLAG_BARRIER | dc_flags);
    }
    
    if (unlikely(dl->do_targetq->do_targetq)) {
    	return _dispatch_sync_recurse(dl, ctxt, func,
    			DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
    		DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
    				dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
复制代码

在这个函数里会先获取线程 id,因为队列需要绑定到线程然后依赖执行,而死锁的原因在于同步线程里的任务出现你等我,我等你的现象,所以只有 _dispatch_queue_try_acquire_barrier_sync 用到了线程 id,在里面果不其然发现了秘密:

static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
		uint32_t tid, uint64_t suspend_count)
{
    uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
    uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
    		_dispatch_lock_value_from_tid(tid) |
    		(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
    uint64_t old_state, new_state;
    // 从 os 底层获取信息,也就是通过线程和当前队列获取 new_state 返回出去
    return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
    	uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
    	if (old_state != (init | role)) {
    		os_atomic_rmw_loop_give_up(break);
    	}
    	new_state = value | role;
    });
}
复制代码

os 底层获取到了一个 new_state 之后,就会执行 _dispatch_sync_f_slow,如果你遇见过死锁,对这个函数肯定不陌生吧,咱们就来看看里面到底做了什么,是不是有点迫不及待!

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
		dispatch_function_t func, uintptr_t top_dc_flags,
		dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
	dispatch_queue_t top_dq = top_dqu._dq;
	dispatch_queue_t dq = dqu._dq;
	if (unlikely(!dq->do_targetq)) {
		return _dispatch_sync_function_invoke(dq, ctxt, func);
	}

	pthread_priority_t pp = _dispatch_get_priority();
	// 初始化保存 block 以及其他信息的结构体
	struct dispatch_sync_context_s dsc = {
		.dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
		.dc_func     = _dispatch_async_and_wait_invoke,
		.dc_ctxt     = &dsc,
		.dc_other    = top_dq,
		.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
		.dc_voucher  = _voucher_get(),
		.dsc_func    = func,
		.dsc_ctxt    = ctxt,
		.dsc_waiter  = _dispatch_tid_self(),
	};
	// 将 block push 到 queue 里面去
	_dispatch_trace_item_push(top_dq, &dsc);
	// 死锁的最终函数
	__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

	// ... 忽略了一些暂时不需要分析的代码
}
复制代码

通过 _dispatch_trace_item_push 函数可以发现队列其实就是一个用来提交 block 的对象,当 block push 到队列中后,将按照 先入先出(FIFO) 的顺序进行处理,系统在 GCD 的底层会维护一个线程池,用来执行这些 block。那这些 block 又是在哪调用的呢? 由于步骤有点多,我就不一一分析了,把大致的流程图写在下面,大家可以自己去跟一下,体验一下源码的乐趣。

dispatch_sync  
└──_dispatch_sync_f
    └──_dispatch_sync_invoke_and_complete
        └──_dispatch_sync_function_invoke_inline
           └──_dispatch_client_callout
              └──f(ctxt);
复制代码

第一个同步是怎么执行的问题解决了,接下来解决第二个死锁问题,要想解决这个问题,就需要来到 __DISPATCH_WAIT_FOR_QUEUE__ 里一探究竟。

static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    // 获取队列的状态,看是否是处于等待状态
    uint64_t dq_state = _dispatch_wait_prepare(dq);
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
    	DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
    			"dispatch_sync called on queue "
    			"already owned by current thread");
    }
    // ... 忽略了一些暂时不需要分析的代码
}
复制代码

获取到队列的状态之后,又调用 _dq_state_drain_locked_by 函数最后来到下面这个函数,这个函数就是死锁的关键所在。

static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{   // lock_value 为队列状态,tid 为线程 id
    // ^ (异或运算法) 两个相同就会出现 0 否则为1
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
复制代码

从上面可以看出,如果队列和线程都是处于等待状态,最终的返回结果为 YES,就会来到 DISPATCH_CLIENT_CRASH 造成 crash

3. dispatch_async 源码分析

分析完同步函数之后,接下来就该来分析分析异步函数了,异步函数会需要开启线程去执行任务,所以这应该会是一个重点,找到重点之后我们就可以继续研究了,一样先来到异步函数的实现:

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
	dispatch_continuation_t dc = _dispatch_continuation_alloc();
	uintptr_t dc_flags = DC_FLAG_CONSUME;
	dispatch_qos_t qos;

	qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
	_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
复制代码

异步函数会通过 _dispatch_continuation_init 先对 block 进行包装即函数式保存,因为异步需要在合适的时机进行线程回调 block ,之后就调用 _dispatch_continuation_async 进行异步处理,由于异步的分支比较多,我下面列出我所分析的步骤:

dispatch_async  
└──_dispatch_continuation_async
    └──dx_push
        └──dq_push
           └──_dispatch_root_queue_push
              └──_dispatch_root_queue_push_inline
                 └──_dispatch_root_queue_poke
                    └──_dispatch_root_queue_poke_slow
复制代码

_dispatch_root_queue_poke_slow 里就能找到开辟线程相关的代码了,那我们就来分析一波吧:

static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
    int r = ENOSYS;
    
    _dispatch_root_queues_init();
    _dispatch_debug_root_queue(dq, __func__);
    _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
    #if !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
    #endif
    {
    	_dispatch_root_queue_debug("requesting new worker thread for global "
    			"queue: %p", dq);
    	r = _pthread_workqueue_addthreads(remaining,
    			_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
    	(void)dispatch_assume_zero(r);
    	return;
    }
    #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    #if DISPATCH_USE_PTHREAD_POOL
    dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
    if (likely(pqc->dpq_thread_mediator.do_vtable)) {
    	while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
    		_dispatch_root_queue_debug("signaled sleeping worker for "
    				"global queue: %p", dq);
    		if (!--remaining) {
    			return;
    		}
    	}
    }
    
    bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    if (overcommit) {
        // 串行队列
    	os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
    } else {
    	if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
    		_dispatch_root_queue_debug("worker thread request still pending for "
    				"global queue: %p", dq);
    		return;
    	}
    }
    
    // floor 为 0,remaining 是根据队列任务的情况处理的
    int can_request, t_count;
    // 获取线程池的大小
    t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
    do {
        // 计算可以请求的数量
        can_request = t_count < floor ? 0 : t_count - floor;
        if (remaining > can_request) {
    	    _dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
    	    		remaining, can_request);
    	    os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
    	    remaining = can_request;
        }
        if (remaining == 0) {
            // 线程池满了,就会报出异常的情况
            _dispatch_root_queue_debug("pthread pool is full for root queue: "
    				"%p", dq);
                return;
    	}
    } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count,
    		t_count - remaining, &t_count, acquire));
    
    pthread_attr_t *attr = &pqc->dpq_thread_attr;
    pthread_t tid, *pthr = &tid;
    #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
    if (unlikely(dq == &_dispatch_mgr_root_queue)) {
    	pthr = _dispatch_mgr_root_queue_init();
    }
    #endif
    do {
    	_dispatch_retain(dq); 
    	// 开辟线程
    	while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
    		if (r != EAGAIN) {
    			(void)dispatch_assume_zero(r);
    		}
    		_dispatch_temporary_resource_shortage();
    	}
    } while (--remaining);
    #else
    (void)floor;
    #endif // DISPATCH_USE_PTHREAD_POOL
}
复制代码

好了,同步和异步暂时分析到这,接下来分析的内容相对比较简单,semaphoregroup

4. semaphore 源码分析

关于信号量的 API 不多,主要是三个,createwaitsignal。 信号量在初始化时要指定 value,随后内部将这个 value 存储起来。实际操作时会存两个 value,一个是当前的value,一个是记录初始 value。信号的 waitsignal 是互逆的两个操作,如果 value 大于 0,前者将 value 减一,此时如果 value 小于零就一直等待。初始 value 必须大于等于 0,如果为 0 并随后调用 wait 方法,线程将被阻塞直到别的线程调用了 signal 方法。简单的介绍了一下使用方法,接下来直接来看源码吧。

(1) dispatch_semaphore_create

这个函数就是去初始化 dsema,并设置 value 的值。

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
    dispatch_semaphore_t dsema;
    // 如果 value 小于 0 直接返回 0
    if (value < 0) {
    	return DISPATCH_BAD_INPUT;
    }
    dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
    		sizeof(struct dispatch_semaphore_s));
    dsema->do_next = DISPATCH_OBJECT_LISTLESS;
    dsema->do_targetq = _dispatch_get_default_queue(false);
    dsema->dsema_value = value;
    _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    dsema->dsema_orig = value;
    return dsema;
}
复制代码
(2) dispatch_semaphore_wait
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {
	    return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}
复制代码

dispatch_atomic_dec2o 是一个宏,会调用 GCC 内置的函数 __sync_sub_and_fetch,实现减法的原子性操作。因此这一行的意思是将 dsema 的值减一,并把新的值赋给 value。如果减一后的 value 大于等于 0 就立刻返回,没有任何操作,否则进入等待状态。

_dispatch_semaphore_wait_slow 函数针对不同的 timeout 参数,分了三种情况考虑:

case DISPATCH_TIME_NOW:
    orig = dsema->dsema_value;
    while (orig < 0) {
    	if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
    			&orig, relaxed)) {
    		return _DSEMA4_TIMEOUT();
    	}
    }
复制代码

这种情况 while 判断一定会成立,因为如果 value 大于等于 0,在上一个函数 dispatch_semaphore_wait 中就已经返回了,判断成立,内部的 if 判断一定也成立,此时会将 value 加一(也就是变为 0) 并返回。加一的原因是为了抵消 wait 函数一开始的减一操作。此时函数调用方会得到返回值 KERN_OPERATION_TIMED_OUT,表示由于等待时间超时而返回。

第二种情况是 DISPATCH_TIME_FOREVER 这个 case:

case DISPATCH_TIME_FOREVER:
    _dispatch_sema4_wait(&dsema->dsema_sema);
    break;
}
复制代码

这种情况会调用系统的 semaphore_wait 方法继续等待,直到收到 signal 调用。

第三种就是一个默认的情况:

default:
    if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
        break;
    }
复制代码

default 分支下,我们指定一个超时时间,在 _dispatch_sema4_timedwait 里面会去判断当前操作是否超时,这和 DISPATCH_TIME_FOREVER 的处理比较类似,不同的是我们调用了内核提供的 semaphore_timedwait 方法可以指定超时时间。

(3) dispatch_semaphore_signal

这个函数的实现相对来说比较简单,因为它不需要阻塞,只用唤醒。

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    long value = os_atomic_inc2o(dsema, dsema_value, release);
    if (likely(value > 0)) {
    	return 0;
    }
    if (unlikely(value == LONG_MIN)) {
    	DISPATCH_CLIENT_CRASH(value,
    			"Unbalanced call to dispatch_semaphore_signal()");
    }
    return _dispatch_semaphore_signal_slow(dsema);
}
复制代码

5. group 源码分析

dispatch_group_create

当我们了解了信号量之后,group 是一个非常容易理解的概念,我们先看看如何创建 group:

dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}

static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
    dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
    		sizeof(struct dispatch_group_s));
    dg->do_next = DISPATCH_OBJECT_LISTLESS;
    dg->do_targetq = _dispatch_get_default_queue(false);
    if (n) {
    	os_atomic_store2o(dg, dg_bits,
    			-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
    	os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
    }
    return dg;
}
复制代码

这个方法就是开辟了内存空间,但是从 os_atomic_store2o 可以看出 group 底层也维护了一个 value 值。

dispatch_group_enter

enter 可以看出,当进组的时候会通过 os_atomic_sub_orig2ovalue 减 4。

void
dispatch_group_enter(dispatch_group_t dg)
{
    uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
    		DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    if (unlikely(old_value == 0)) {
    	_dispatch_retain(dg); // <rdar://problem/22318411>
    }
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
    	DISPATCH_CLIENT_CRASH(old_bits,
    			"Too many nested calls to dispatch_group_enter()");
    }
}
复制代码
dispatch_group_leave

出组的时候会对 value 进行加值,如果 new_stateold_state 相等,就会调用 _dispatch_group_wake 继续后面代码的执行,这个函数后面会说到。

void
dispatch_group_leave(dispatch_group_t dg)
{   
    uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
    		DISPATCH_GROUP_VALUE_INTERVAL, release);
    uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
    
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
    	old_state += DISPATCH_GROUP_VALUE_INTERVAL;
    	do {
    		new_state = old_state;
    		if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    			new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
    			new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
    		} else {
    			new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
    		}
    		if (old_state == new_state) break;
    	} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
    			old_state, new_state, &old_state, relaxed)));
    	return _dispatch_group_wake(dg, old_state, true);
    }
    
    if (unlikely(old_value == 0)) {
    	DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
    			"Unbalanced call to dispatch_group_leave()");
    }
}
复制代码
dispatch_group_async

dispatch_group_async 就是对 enterleave 的封装,当 block 调用完成之后进行 callout 之后就出组了。

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;
    // 保存任务 
    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
		dispatch_continuation_t dc, dispatch_qos_t qos)
{   // 进组
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
    	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
    	_dispatch_trace_item_complete(dc);
    	// 出组
    	dispatch_group_leave((dispatch_group_t)dou);
    } else {
    	DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}
复制代码
dispatch_group_wait

这个方法用于等待 group 中所有任务执行完成,可以理解为信号量 wait 的封装:

long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
    uint64_t old_state, new_state;
    
    os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
    	if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
    		os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
    	}
    	if (unlikely(timeout == 0)) {
    		os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
    	}
    	new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
    	if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
    		os_atomic_rmw_loop_give_up(break);
    	}
    });
    
    return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
复制代码

如果当前 value 和原始 value 相同,表明任务已经全部完成,直接返回 0,如果 timeout 为 0 也会立刻返回,否则调用 _dispatch_group_wait_slow。 在 _dispatch_group_wait_slow 会一直等到任务完成返回 0 ,当然如果一直没有完成就会返回 timeout

_dispatch_group_wake

这个函数主要做的就是循环调用 dispatch_async_f 异步执行在 notify 函数中注册的回调。

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
    
    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
    	dispatch_continuation_t dc, next_dc, tail;
    
    	// Snapshot before anything is notified/woken
    	dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
    	do {
    		dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
    		next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
    		_dispatch_continuation_async(dsn_queue, dc,
    				_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
    		_dispatch_release(dsn_queue);
    	} while ((dc = next_dc));
    
    	refs++;
    }
    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
    	_dispatch_wake_by_address(&dg->dg_gen);
    }
    if (refs) _dispatch_release_n(dg, refs);
}
复制代码

6. 小扩展: dispatch_once 的源码分析

dispatch_once 仅仅是一个包装,内部直接调用了 dispatch_once_f:

void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;
    #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
    	return;
    }
    #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
    	return _dispatch_once_mark_done_if_quiesced(l, v);
    }
    #endif
    #endif
    if (_dispatch_once_gate_tryenter(l)) {
    	return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}
复制代码

第一次调用时外部传进来的 onceToken 还是空指针,所以 valNULL_dispatch_once_gate_tryenter(l) 判断 l->dgo_once 是否标记为 DLOCK_ONCE_UNLOCKED,但是 #define DLOCK_ONCE_UNLOCKED ((uintptr_t)0),所以 if 判断是成立的,就会进行 block 回调,然后在通过 _dispatch_once_gate_broadcastl->dgo_once 标记为 DLOCK_ONCE_DONE,下次再进来的时候就会直接返回保证代码只执行一次。

四、总结

本文主要整理了 GCD 中常见的 API 以及底层的实现原理。

  • dispatch_sync 将任务 block 通过 push 到队列中,然后按照 FIFO 去执行。
  • dispatch_async 会把任务包装并保存,之后就会开辟相应线程去执行已保存的任务。
  • semaphore 主要使用 signalwait 这两个接口,底层分别调用了内核提供的方法。在调用 signal 方法后,先将 value 减一,如果大于零立刻返回,否则陷入等待。signal 方法将信号量加一,如果 value 大于零立刻返回,否则说明唤醒了某一个等待线程,此时由系统决定哪个线程的等待方法可以返回。
  • dispatch_group 底层也是维护了一个 value 的值,等待 group 完成实际上就是等待 value 恢复初始值。而 notify 的作用是将所有注册的回调组装成一个链表,在 dispatch_async 完成时判断 value 是不是恢复初始值,如果是则调用 dispatch_async 异步执行所有注册的回调。
  • dispatch_once 通过一个静态变量来标记 block 是否已被执行,同时使用加锁确保只有一个线程能执行,执行完 block 后会唤醒其他所有等待的线程。

最后在此感谢 深入了解GCD 对我的启发。

这篇关于iOS GCD源码浅析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!