什么是 GCD
?GCD(Grand Central Dispatch)
是异步执行任务的技术之一。我们只需要将定义的任务追加到适当的 Dispatch Queue
中,GCD
就能帮我们生成必要的线程并执行我们的任务而且不需要编写任何线程管理代码。因此使用 GCD
是 very easy
的,首先创建队列,然后将任务添加到队列,最后指定执行任务函数,搞定收工。
// 创建同步队列 dispatch_queue_t queue = dispatch_queue_create("cc", DISPATCH_QUEUE_SERIAL); // 添加任务 dispatch_block_t block = ^{ NSLog(@"hello word"); }; // 指定执行任务函数,依赖队列执行 dispatch_async(queue, block); 复制代码
但是 GCD
又是否真的如此简单呢?有兴趣的话就一起去 GCD
的源码看看呗,看看又不花钱。
首先,创建两个常见的队列,串行队列和并发队列,然后分别 po
一下里面都有什么东东,再顺便把主队列和全局队列分别一起给看了,先对队列里面的东西有一丢丢的印象就可以了。
width
和串行队列的 width
是一样的,其他的也有很多相同的,难怪说主队列是特殊的串行队列。但是并发队列和全局队列的 width
却相差 1,有点奇怪。接下来我们就到源码中一探究竟吧。在此附上 libdispatch源码链接,希望大家能一起开开心心看源码,本文使用的是 libdispatch-1008.220.2
版本。
来到源码之后,当然是全局搜索快速定位 dispatch_queue_create
位置,看它到底在里面做了什么坏事,然后发现队列是通过 _dispatch_lane_create_with_target
创建的,常规接口隔离操作。_dispatch_lane_create_with_target
第一句代码就是将我们的外界传进来区分串行还是并发的参数传进去创建了一个 dispatch_queue_attr_info_t
类型的结构体,有鬼。
dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa); 复制代码
一查 dispatch_queue_attr_info_t
是一个结构体位域,结构体位域可以通过一些位运算取出我们想要的内容,过滤掉我们不想要的数据。
typedef struct dispatch_queue_attr_info_s { dispatch_qos_t dqai_qos : 8; int dqai_relpri : 8; uint16_t dqai_overcommit:2; uint16_t dqai_autorelease_frequency:2; uint16_t dqai_concurrent:1; uint16_t dqai_inactive:1; } dispatch_queue_attr_info_t; 复制代码
接下来我们来看一下是怎么创建这个结构体的吧。
dispatch_queue_attr_info_t _dispatch_queue_attr_to_info(dispatch_queue_attr_t dqa) { dispatch_queue_attr_info_t dqai = { }; // 串行队列直接返回空的 dqai 结构体 if (!dqa) return dqai; #if DISPATCH_VARIANT_STATIC if (dqa == &_dispatch_queue_attr_concurrent) { dqai.dqai_concurrent = true; return dqai; } #endif if (dqa < _dispatch_queue_attrs || dqa >= &_dispatch_queue_attrs[DISPATCH_QUEUE_ATTR_COUNT]) { DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute"); } // 苹果的算法 size_t idx = (size_t)(dqa - _dispatch_queue_attrs); // 并发队列结构体位域的默认配置和赋值 dqai.dqai_inactive = (idx % DISPATCH_QUEUE_ATTR_INACTIVE_COUNT); idx /= DISPATCH_QUEUE_ATTR_INACTIVE_COUNT; dqai.dqai_concurrent = !(idx % DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT); idx /= DISPATCH_QUEUE_ATTR_CONCURRENCY_COUNT; dqai.dqai_relpri = -(idx % DISPATCH_QUEUE_ATTR_PRIO_COUNT); idx /= DISPATCH_QUEUE_ATTR_PRIO_COUNT; dqai.dqai_qos = idx % DISPATCH_QUEUE_ATTR_QOS_COUNT; idx /= DISPATCH_QUEUE_ATTR_QOS_COUNT; dqai.dqai_autorelease_frequency = idx % DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT; idx /= DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY_COUNT; dqai.dqai_overcommit = idx % DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT; idx /= DISPATCH_QUEUE_ATTR_OVERCOMMIT_COUNT; return dqai; } 复制代码
从上面可以发现串行队列就是一个空的结构体,并发队列会对结构体进行一系列的默认配置和赋值,重点关注 dqai.dqai_concurrent
等一些 属性都是有值的,这也会是我们下面的源码的一个分水岭。接着继续往下分析,在忽略了一些代码之后,终于发现了重点所在 vtable
,这个应该一个表,然后我们来看看它都做了些什么。
const void *vtable; if (dqai.dqai_concurrent) { // 通过 dqai.dqai_concurrent 来区分并发和串行 vtable = DISPATCH_VTABLE(queue_concurrent); } else { vtable = DISPATCH_VTABLE(queue_serial); } 复制代码
创建了 vtable
之后再通过 vtable
开辟内存,生成响应的对象 queue
。
dispatch_lane_t dq = _dispatch_object_alloc(vtable, sizeof(struct dispatch_lane_s)); _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ? DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER | (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); 复制代码
这个 init
构造函数的第三个参数是个三目运算符,如果是并发队列是一个宏定义,串行队列是 1,然后再搜搜这个宏定义发现居然是 #define DISPATCH_QUEUE_WIDTH_MAX (0x1000ull - 2)
,妈耶,这不就是上面我们打印串行队列和并发队列的 width
的值嘛!原来就是在这初始化的。那为啥是减 2 而不是减 1 呢?减 1 就是我们的全局并发队列!
// 验证 #define DISPATCH_QUEUE_WIDTH_POOL (0x1000ull - 1) struct dispatch_queue_global_s _dispatch_mgr_root_queue = { DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, .do_ctxt = &_dispatch_mgr_root_queue_pthread_context, .dq_label = "com.apple.root.libdispatch-manager", .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), .dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER | DISPATCH_PRIORITY_SATURATED_OVERRIDE, .dq_serialnum = 3, .dgq_thread_pool_size = 1, }; 复制代码
好,接着往下看,又找到了 dq->do_targetq = tq;
,这个 tq
又是在哪赋值的呢?我在上面找到了赋值代码。
#define DISPATCH_QOS_UNSPECIFIED ((dispatch_qos_t)0) #define DISPATCH_QOS_DEFAULT ((dispatch_qos_t)4) // Serial queues default to overcommit! // 如果是并发 overcommit = _dispatch_queue_attr_overcommit_disabled // 如果是串行 overcommit = _dispatch_queue_attr_overcommit_enabled overcommit = dqai.dqai_concurrent ? _dispatch_queue_attr_overcommit_disabled : _dispatch_queue_attr_overcommit_enabled; if (!tq) { tq = _dispatch_get_root_queue( qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, // 4 overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq; // 0 1 } static inline dispatch_queue_global_t _dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit) { if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) { DISPATCH_CLIENT_CRASH(qos, "Corrupted priority"); } // qos 为 4,4-1= 3 // 2*3 + 0或者1 = 6/7 // 然后再去数组 _dispatch_root_queues 里取数组的 6 或者 7 的下标指针地址 return &_dispatch_root_queues[2 * (qos - 1) + overcommit]; } 复制代码
由于 tq
的值是通过 _dispatch_root_queues
数组取出来的,直接到数组里面看就一目了然了。我就只列举了串行队列、并发队列(全局和并发是一样的)和主队列的。由此可以发现 tq
就是 dq_label
的值,也就是外面队列 target
的值。
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.utility-qos.overcommit", .dq_serialnum = 9, ), _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK, .dq_label = "com.apple.root.default-qos", .dq_serialnum = 10, ), _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.default-qos.overcommit", .dq_serialnum = 11, ), 复制代码
既然串行队列和并发队列的 target
信息是从 _dispatch_root_queues
结构体数组取出来的,那么 _dispatch_root_queues
又是在哪创建的呢?我们来到最先初始化的 libdispatcdispatch_queue_createh_init
里的查找,最终在 _dispatch_introspection_init
找到了一些代码。
for
循环,调用 _dispatch_trace_queue_create
,再取出 _dispatch_root_queues
里的地址指针一个一个创建出来的。
alloc
和 init
里面的源码我都发现了 dispatch_object_t
这个类型,然后一搜居然是一个联合体,这个就有意思了,通过联合体同一时间只能一个有值的特性给函数带来了多态性。typedef union { struct _os_object_s *_os_obj; struct dispatch_object_s *_do; struct dispatch_queue_s *_dq; struct dispatch_queue_attr_s *_dqa; struct dispatch_group_s *_dg; struct dispatch_source_s *_ds; struct dispatch_mach_s *_dm; struct dispatch_mach_msg_s *_dmsg; struct dispatch_semaphore_s *_dsema; struct dispatch_data_s *_ddata; struct dispatch_io_s *_dchannel; } dispatch_object_t DISPATCH_TRANSPARENT_UNION; 复制代码
到此,GCD
的大门就已经打开,接下来我们来点深入一点的东西,也让大家不虚此行。
当我们了解队列的创建之后,添加任务就是一个 block
,block
参数的类型是 dispatch_block_t
,它是一个没有参数,没有返回值的 block
:typedef void (^dispatch_block_t)(void);
没什么太多的好研究的,接下来就来看看执行任务的函数都在底层做了些什么吧。接下来的内容会比较的干巴巴,但是看的越深,对自己帮助会更大。
执行任务的函数分为两种,同步和异步函数(即同步线程和异步线程)。
dispatch_sync
,必须等待当前语句执行完毕,才会执行下一条语
句,不会开启线程,在当前执行任务。dispatch_async
。不用等待当前语句执行完毕,就可以执行下一条语句,会开启线程执行任务。在分析同步函数源码之前,先提出两个疑问,同步是怎么执行的呢?死锁是在同步的情况下造成的,那底层源码又是怎么样的呢?带着这两个疑问点我们接着分析dispatch_sync
源码。
我们来到 dispatch_sync
函数实现:
void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) { uintptr_t dc_flags = DC_FLAG_BLOCK; if (unlikely(_dispatch_block_has_private_data(work))) { return _dispatch_sync_block_with_privdata(dq, work, dc_flags); } _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags); } 复制代码
dispatch_sync
里的 unlikely
的意思是基本上不会走,然后就来到 _dispatch_sync_f
函数,_dispatch_sync_f
的第三个参数是将 block
包装了一下,具体包装如下:
#define _dispatch_Block_invoke(bb) \ ((dispatch_function_t)((struct Block_layout *)bb)->invoke) 复制代码
省略各种分支之后就会来到 _dispatch_sync_f_inline
函数,实现如下:
static inline void _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { // 串行就会走这下面 if (likely(dq->dq_width == 1)) { return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags); } // ... 下面代码暂时省略 } 复制代码
经常层层难关,发现串行队列好像就是调用了 _dispatch_barrier_sync_f
函数,看到这个函数是不是想起了一些什么,这是不是就是栅栏函数,那分析完同步之后,栅栏也就迎刃而解了,爽歪歪。继续往里分析,最终来到 _dispatch_barrier_sync_f_inline
:
static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { // 获取线程ID dispatch_tid tid = _dispatch_tid_self(); if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) { DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); } dispatch_lane_t dl = upcast(dq)._dl; // 死锁 if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) { return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, DC_FLAG_BARRIER | dc_flags); } if (unlikely(dl->do_targetq->do_targetq)) { return _dispatch_sync_recurse(dl, ctxt, func, DC_FLAG_BARRIER | dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); } 复制代码
在这个函数里会先获取线程 id
,因为队列需要绑定到线程然后依赖执行,而死锁的原因在于同步线程里的任务出现你等我,我等你的现象,所以只有 _dispatch_queue_try_acquire_barrier_sync
用到了线程 id
,在里面果不其然发现了秘密:
static inline bool _dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq, uint32_t tid, uint64_t suspend_count) { uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width); uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER | _dispatch_lock_value_from_tid(tid) | (suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL); uint64_t old_state, new_state; // 从 os 底层获取信息,也就是通过线程和当前队列获取 new_state 返回出去 return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, { uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK; if (old_state != (init | role)) { os_atomic_rmw_loop_give_up(break); } new_state = value | role; }); } 复制代码
从 os
底层获取到了一个 new_state
之后,就会执行 _dispatch_sync_f_slow
,如果你遇见过死锁,对这个函数肯定不陌生吧,咱们就来看看里面到底做了什么,是不是有点迫不及待!
static void _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt, dispatch_function_t func, uintptr_t top_dc_flags, dispatch_queue_class_t dqu, uintptr_t dc_flags) { dispatch_queue_t top_dq = top_dqu._dq; dispatch_queue_t dq = dqu._dq; if (unlikely(!dq->do_targetq)) { return _dispatch_sync_function_invoke(dq, ctxt, func); } pthread_priority_t pp = _dispatch_get_priority(); // 初始化保存 block 以及其他信息的结构体 struct dispatch_sync_context_s dsc = { .dc_flags = DC_FLAG_SYNC_WAITER | dc_flags, .dc_func = _dispatch_async_and_wait_invoke, .dc_ctxt = &dsc, .dc_other = top_dq, .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG, .dc_voucher = _voucher_get(), .dsc_func = func, .dsc_ctxt = ctxt, .dsc_waiter = _dispatch_tid_self(), }; // 将 block push 到 queue 里面去 _dispatch_trace_item_push(top_dq, &dsc); // 死锁的最终函数 __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq); // ... 忽略了一些暂时不需要分析的代码 } 复制代码
通过 _dispatch_trace_item_push
函数可以发现队列其实就是一个用来提交 block
的对象,当 block
push 到队列中后,将按照 先入先出(FIFO) 的顺序进行处理,系统在 GCD
的底层会维护一个线程池,用来执行这些 block
。那这些 block
又是在哪调用的呢?
由于步骤有点多,我就不一一分析了,把大致的流程图写在下面,大家可以自己去跟一下,体验一下源码的乐趣。
dispatch_sync └──_dispatch_sync_f └──_dispatch_sync_invoke_and_complete └──_dispatch_sync_function_invoke_inline └──_dispatch_client_callout └──f(ctxt); 复制代码
第一个同步是怎么执行的问题解决了,接下来解决第二个死锁问题,要想解决这个问题,就需要来到 __DISPATCH_WAIT_FOR_QUEUE__
里一探究竟。
static void __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq) { // 获取队列的状态,看是否是处于等待状态 uint64_t dq_state = _dispatch_wait_prepare(dq); if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) { DISPATCH_CLIENT_CRASH((uintptr_t)dq_state, "dispatch_sync called on queue " "already owned by current thread"); } // ... 忽略了一些暂时不需要分析的代码 } 复制代码
获取到队列的状态之后,又调用 _dq_state_drain_locked_by
函数最后来到下面这个函数,这个函数就是死锁的关键所在。
static inline bool _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid) { // lock_value 为队列状态,tid 为线程 id // ^ (异或运算法) 两个相同就会出现 0 否则为1 return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0; } 复制代码
从上面可以看出,如果队列和线程都是处于等待状态,最终的返回结果为 YES
,就会来到 DISPATCH_CLIENT_CRASH
造成 crash
。
分析完同步函数之后,接下来就该来分析分析异步函数了,异步函数会需要开启线程去执行任务,所以这应该会是一个重点,找到重点之后我们就可以继续研究了,一样先来到异步函数的实现:
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME; dispatch_qos_t qos; qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags); _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); } 复制代码
异步函数会通过 _dispatch_continuation_init
先对 block
进行包装即函数式保存,因为异步需要在合适的时机进行线程回调 block
,之后就调用 _dispatch_continuation_async
进行异步处理,由于异步的分支比较多,我下面列出我所分析的步骤:
dispatch_async └──_dispatch_continuation_async └──dx_push └──dq_push └──_dispatch_root_queue_push └──_dispatch_root_queue_push_inline └──_dispatch_root_queue_poke └──_dispatch_root_queue_poke_slow 复制代码
在 _dispatch_root_queue_poke_slow
里就能找到开辟线程相关的代码了,那我们就来分析一波吧:
static void _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor) { int remaining = n; int r = ENOSYS; _dispatch_root_queues_init(); _dispatch_debug_root_queue(dq, __func__); _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n); #if !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_ROOT_QUEUES if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) #endif { _dispatch_root_queue_debug("requesting new worker thread for global " "queue: %p", dq); r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority)); (void)dispatch_assume_zero(r); return; } #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_POOL dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt; if (likely(pqc->dpq_thread_mediator.do_vtable)) { while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { _dispatch_root_queue_debug("signaled sleeping worker for " "global queue: %p", dq); if (!--remaining) { return; } } } bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT; if (overcommit) { // 串行队列 os_atomic_add2o(dq, dgq_pending, remaining, relaxed); } else { if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) { _dispatch_root_queue_debug("worker thread request still pending for " "global queue: %p", dq); return; } } // floor 为 0,remaining 是根据队列任务的情况处理的 int can_request, t_count; // 获取线程池的大小 t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered); do { // 计算可以请求的数量 can_request = t_count < floor ? 0 : t_count - floor; if (remaining > can_request) { _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", remaining, can_request); os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed); remaining = can_request; } if (remaining == 0) { // 线程池满了,就会报出异常的情况 _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); return; } } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count, t_count - remaining, &t_count, acquire)); pthread_attr_t *attr = &pqc->dpq_thread_attr; pthread_t tid, *pthr = &tid; #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES if (unlikely(dq == &_dispatch_mgr_root_queue)) { pthr = _dispatch_mgr_root_queue_init(); } #endif do { _dispatch_retain(dq); // 开辟线程 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { if (r != EAGAIN) { (void)dispatch_assume_zero(r); } _dispatch_temporary_resource_shortage(); } } while (--remaining); #else (void)floor; #endif // DISPATCH_USE_PTHREAD_POOL } 复制代码
好了,同步和异步暂时分析到这,接下来分析的内容相对比较简单,semaphore
和 group
。
关于信号量的 API
不多,主要是三个,create
、wait
和 signal
。
信号量在初始化时要指定 value
,随后内部将这个 value
存储起来。实际操作时会存两个 value
,一个是当前的value
,一个是记录初始 value
。信号的 wait
和 signal
是互逆的两个操作,如果 value
大于 0,前者将 value
减一,此时如果 value
小于零就一直等待。初始 value
必须大于等于 0,如果为 0 并随后调用 wait
方法,线程将被阻塞直到别的线程调用了 signal
方法。简单的介绍了一下使用方法,接下来直接来看源码吧。
这个函数就是去初始化 dsema
,并设置 value
的值。
dispatch_semaphore_t dispatch_semaphore_create(long value) { dispatch_semaphore_t dsema; // 如果 value 小于 0 直接返回 0 if (value < 0) { return DISPATCH_BAD_INPUT; } dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)); dsema->do_next = DISPATCH_OBJECT_LISTLESS; dsema->do_targetq = _dispatch_get_default_queue(false); dsema->dsema_value = value; _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); dsema->dsema_orig = value; return dsema; } 复制代码
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long value = os_atomic_dec2o(dsema, dsema_value, acquire); if (likely(value >= 0)) { return 0; } return _dispatch_semaphore_wait_slow(dsema, timeout); } 复制代码
dispatch_atomic_dec2o
是一个宏,会调用 GCC
内置的函数 __sync_sub_and_fetch
,实现减法的原子性操作。因此这一行的意思是将 dsema
的值减一,并把新的值赋给 value
。如果减一后的 value
大于等于 0 就立刻返回,没有任何操作,否则进入等待状态。
_dispatch_semaphore_wait_slow
函数针对不同的 timeout
参数,分了三种情况考虑:
case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } 复制代码
这种情况 while
判断一定会成立,因为如果 value
大于等于 0,在上一个函数 dispatch_semaphore_wait
中就已经返回了,判断成立,内部的 if
判断一定也成立,此时会将 value
加一(也就是变为 0) 并返回。加一的原因是为了抵消 wait
函数一开始的减一操作。此时函数调用方会得到返回值 KERN_OPERATION_TIMED_OUT
,表示由于等待时间超时而返回。
第二种情况是 DISPATCH_TIME_FOREVER
这个 case
:
case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break; } 复制代码
这种情况会调用系统的 semaphore_wait
方法继续等待,直到收到 signal
调用。
第三种就是一个默认的情况:
default: if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } 复制代码
在default
分支下,我们指定一个超时时间,在 _dispatch_sema4_timedwait
里面会去判断当前操作是否超时,这和 DISPATCH_TIME_FOREVER
的处理比较类似,不同的是我们调用了内核提供的 semaphore_timedwait
方法可以指定超时时间。
这个函数的实现相对来说比较简单,因为它不需要阻塞,只用唤醒。
long dispatch_semaphore_signal(dispatch_semaphore_t dsema) { long value = os_atomic_inc2o(dsema, dsema_value, release); if (likely(value > 0)) { return 0; } if (unlikely(value == LONG_MIN)) { DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()"); } return _dispatch_semaphore_signal_slow(dsema); } 复制代码
当我们了解了信号量之后,group
是一个非常容易理解的概念,我们先看看如何创建 group
:
dispatch_group_t dispatch_group_create(void) { return _dispatch_group_create_with_count(0); } static inline dispatch_group_t _dispatch_group_create_with_count(uint32_t n) { dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s)); dg->do_next = DISPATCH_OBJECT_LISTLESS; dg->do_targetq = _dispatch_get_default_queue(false); if (n) { os_atomic_store2o(dg, dg_bits, -n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed); os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411> } return dg; } 复制代码
这个方法就是开辟了内存空间,但是从 os_atomic_store2o
可以看出 group
底层也维护了一个 value
值。
从 enter
可以看出,当进组的时候会通过 os_atomic_sub_orig2o
对 value
减 4。
void dispatch_group_enter(dispatch_group_t dg) { uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits, DISPATCH_GROUP_VALUE_INTERVAL, acquire); uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK; if (unlikely(old_value == 0)) { _dispatch_retain(dg); // <rdar://problem/22318411> } if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) { DISPATCH_CLIENT_CRASH(old_bits, "Too many nested calls to dispatch_group_enter()"); } } 复制代码
出组的时候会对 value
进行加值,如果 new_state
和 old_state
相等,就会调用 _dispatch_group_wake
继续后面代码的执行,这个函数后面会说到。
void dispatch_group_leave(dispatch_group_t dg) { uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state, DISPATCH_GROUP_VALUE_INTERVAL, release); uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK); if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) { old_state += DISPATCH_GROUP_VALUE_INTERVAL; do { new_state = old_state; if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) { new_state &= ~DISPATCH_GROUP_HAS_WAITERS; new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } else { new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } if (old_state == new_state) break; } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state, old_state, new_state, &old_state, relaxed))); return _dispatch_group_wake(dg, old_state, true); } if (unlikely(old_value == 0)) { DISPATCH_CLIENT_CRASH((uintptr_t)old_value, "Unbalanced call to dispatch_group_leave()"); } } 复制代码
dispatch_group_async
就是对 enter
和 leave
的封装,当 block
调用完成之后进行 callout
之后就出组了。
void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC; dispatch_qos_t qos; // 保存任务 qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags); _dispatch_continuation_group_async(dg, dq, dc, qos); } static inline void _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dc, dispatch_qos_t qos) { // 进组 dispatch_group_enter(dg); dc->dc_data = dg; _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); } static inline void _dispatch_continuation_with_group_invoke(dispatch_continuation_t dc) { struct dispatch_object_s *dou = dc->dc_data; unsigned long type = dx_type(dou); if (type == DISPATCH_GROUP_TYPE) { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_trace_item_complete(dc); // 出组 dispatch_group_leave((dispatch_group_t)dou); } else { DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type"); } } 复制代码
这个方法用于等待 group
中所有任务执行完成,可以理解为信号量 wait
的封装:
long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) { uint64_t old_state, new_state; os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, { if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) { os_atomic_rmw_loop_give_up_with_fence(acquire, return 0); } if (unlikely(timeout == 0)) { os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT()); } new_state = old_state | DISPATCH_GROUP_HAS_WAITERS; if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) { os_atomic_rmw_loop_give_up(break); } }); return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout); } 复制代码
如果当前 value
和原始 value
相同,表明任务已经全部完成,直接返回 0,如果 timeout
为 0 也会立刻返回,否则调用 _dispatch_group_wait_slow
。
在 _dispatch_group_wait_slow
会一直等到任务完成返回 0 ,当然如果一直没有完成就会返回 timeout
。
这个函数主要做的就是循环调用 dispatch_async_f
异步执行在 notify
函数中注册的回调。
static void _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release) { uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411> if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) { dispatch_continuation_t dc, next_dc, tail; // Snapshot before anything is notified/woken dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail); do { dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data; next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next); _dispatch_continuation_async(dsn_queue, dc, _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags); _dispatch_release(dsn_queue); } while ((dc = next_dc)); refs++; } if (dg_state & DISPATCH_GROUP_HAS_WAITERS) { _dispatch_wake_by_address(&dg->dg_gen); } if (refs) _dispatch_release_n(dg, refs); } 复制代码
dispatch_once
仅仅是一个包装,内部直接调用了 dispatch_once_f
:
void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) { dispatch_once_gate_t l = (dispatch_once_gate_t)val; #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER uintptr_t v = os_atomic_load(&l->dgo_once, acquire); if (likely(v == DLOCK_ONCE_DONE)) { return; } #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER if (likely(DISPATCH_ONCE_IS_GEN(v))) { return _dispatch_once_mark_done_if_quiesced(l, v); } #endif #endif if (_dispatch_once_gate_tryenter(l)) { return _dispatch_once_callout(l, ctxt, func); } return _dispatch_once_wait(l); } 复制代码
第一次调用时外部传进来的 onceToken
还是空指针,所以 val
为 NULL
,_dispatch_once_gate_tryenter(l)
判断 l->dgo_once
是否标记为 DLOCK_ONCE_UNLOCKED
,但是 #define DLOCK_ONCE_UNLOCKED ((uintptr_t)0)
,所以 if 判断是成立的,就会进行 block
回调,然后在通过 _dispatch_once_gate_broadcast
将 l->dgo_once
标记为 DLOCK_ONCE_DONE
,下次再进来的时候就会直接返回保证代码只执行一次。
本文主要整理了 GCD
中常见的 API
以及底层的实现原理。
dispatch_sync
将任务 block
通过 push
到队列中,然后按照 FIFO
去执行。dispatch_async
会把任务包装并保存,之后就会开辟相应线程去执行已保存的任务。semaphore
主要使用 signal
和 wait
这两个接口,底层分别调用了内核提供的方法。在调用 signal
方法后,先将 value
减一,如果大于零立刻返回,否则陷入等待。signal
方法将信号量加一,如果 value
大于零立刻返回,否则说明唤醒了某一个等待线程,此时由系统决定哪个线程的等待方法可以返回。dispatch_group
底层也是维护了一个 value
的值,等待 group
完成实际上就是等待 value
恢复初始值。而 notify
的作用是将所有注册的回调组装成一个链表,在 dispatch_async
完成时判断 value
是不是恢复初始值,如果是则调用 dispatch_async
异步执行所有注册的回调。dispatch_once
通过一个静态变量来标记 block
是否已被执行,同时使用加锁确保只有一个线程能执行,执行完 block
后会唤醒其他所有等待的线程。最后在此感谢 深入了解GCD 对我的启发。