“Yeah I’m here’. ”
正文
dispatch_once
typedef long dispatch_once_t;
void dispatch_once(dispatch_once_t *val, void (^block)(void)){
struct Block_basic *bb = (void *)block;
dispatch_once_f(val, block, (void *)bb->Block_invoke);
}
void dispatch_once_f(dispatch_once_t *val, void *ctxt, void (*func)(void *)){
volatile long *vval = val;
if (dispatch_atomic_cmpxchg(val, 0l, 1l)) {
func(ctxt); // block真正执行
dispatch_atomic_barrier();
*val = ~0l;
}
else
{
do
{
_dispatch_hardware_pause();
} while (*vval != ~0l);
dispatch_atomic_barrier();
}
}
可以看到dispatch_once
实际上调用了dispatch_once_f
在多线程环境中,如果某一个线程A首次进入dispatch_once_f
,val==0,这个时候直接将其原子操作设为1,然后执行传入dispatch_once_f
的block,然后调用dispatch_atomic_barrier
,最后将val的值修改为~0。
dispatch_atomic_barrier
是一种内存屏障,因为处理器为了加快处理速度,可能会乱序执行,在这里插入一个内存屏障,就相当于告诉编译器,屏障前后的指令顺序不能颠倒,告诉处理器,只有等屏障前的指令执行完了,屏障后的指令才能开始执行。所以这里dispatch_atomic_barrier能保证只有在block执行完毕后才能修改*val的值。
常用API解析
1.dispatch_async
直接上函数实现:
dispatch_async(dispatch_queue_t queue, dispatch_block_t block) {
dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);
}
block 参数的类型是 dispatch_block_t
,它是一个没有参数,没有返回值的 block:
typedef void (^dispatch_block_t)(void);
dispatch_async
主要将参数进行了处理,它将 block 复制了一份,然后去调用dispatch_async_f
- 1、
_dispatch_Block_copy
在堆上创建传入block的拷贝,或者增加引用计数,这样就保证了block在执行之前不会被销毁 - 2、
_dispatch_call_block_and_release
的定义如下,顾名思义,调用block,然后将block销毁
dispatch_async_f
省略各种分支后的 dispatch_async_f
函数实现如下:
//func 参数是一个函数,在实际调用时,会把第二参数 ctxt 作为参数传入
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
dispatch_continuation_t dc;
if (dq->dq_width == 1) {
return dispatch_barrier_async_f(dq, ctxt, func);
}
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
dc->dc_func = func;
dc->dc_ctxt = ctxt;
if (dq->do_targetq) {
return _dispatch_async_f2(dq, dc);
}
_dispatch_queue_push(dq, dc);
}
可见如果是串行队列 (dq_width = 1),会调用 dispatch_barrier_async_f
函数处理,这个后文会有介绍,如果有 do_targetq
则进行转发。
如果是主队列,则直接调用_dispatch_queue_wakeup_main
如果是全局队列,调用 _dispatch_queue_push
入队。
这里的 dispatch_continuation_t
其实是对 block 的封装,然后调用 _dispatch_queue_push
这个宏将封装好的 block 放入队列中。
把这个宏展开,然后依次分析调用栈,选择一条主干调用线,结果如下:
_dispatch_queue_push
└──_dispatch_trace_queue_push
└──_dispatch_queue_push
└──_dispatch_queue_push_slow
└──_dispatch_queue_push_list_slow2
└──_dispatch_wakeup
└──dx_probe
队列中保存了一个链表,我们首先将新的 block 添加到链表尾部,然后调用 dx_probe
宏,它依赖于 vtable 数据结构,GCD 中的大部分对象,比如队列等,都具有这个数据结构。它定义了对象在不同操作下该执行的方法,比如在这里的 probe
操作下,实际上会执行 _dispatch_queue_wakeup_global
方法,调用栈如下
_dispatch_queue_wakeup_global
└──_dispatch_queue_wakeup_global2
└──_dispatch_queue_wakeup_global_slow
_dispatch_queue_wakeup_global_slow
在 _dispatch_queue_wakeup_global_slow
我们见到了熟悉的老朋友,pthread 线程:
// 如果线程池已满,则直接调用 _dispatch_worker_thread
// 否则创建线程池
static void _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) {
static dispatch_once_t pred;
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
pthread_workitem_handle_t wh;
unsigned int gen_cnt;
pthread_t pthr;
int r, t_count;
if (!dq->dq_items_tail) {
return false;
}
_dispatch_safe_fork = false;
dispatch_debug_queue(dq, __PRETTY_FUNCTION__);
// 全局队列的检测,初始化和配置环境(只调用1次)
dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);
// 如果队列的dgq_kworkqueue不为空,则从
if (qc->dgq_kworkqueue) {
if (dispatch_atomic_cmpxchg(&qc->dgq_pending, 0, 1)) {
_dispatch_debug("requesting new worker thread");
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
dispatch_assume_zero(r);
} else {
_dispatch_debug("work thread request still pending on global queue: %p", dq);
}
goto out;
}
// 发送一个信号量,这是一种线程保活的方法
if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
goto out;
}
// 检测线程池可用的大小,如果还有,则线程池减1
do {
t_count = qc->dgq_thread_pool_size;
if (!t_count) {
_dispatch_debug("The thread pool is full: %p", dq);
goto out;
}
} while (!dispatch_atomic_cmpxchg(&qc->dgq_thread_pool_size, t_count, t_count - 1));
// 使用pthread 库创建一个线程,线程的入口是_dispatch_worker_thread
while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
dispatch_assume_zero(r);
}
sleep(1);
}
// 调用pthread_detach,主线程与子线程分离,子线程结束后,资源自动回收
r = pthread_detach(pthr);
dispatch_assume_zero(r);
out:
return false;
}
由此可见这里确实使用了线程池。创建线程后会执行 _dispatch_worker_thread
回调:
_dispatch_worker_thread2
void _dispatch_worker_thread2(void *context)
{
struct dispatch_object_s *item;
dispatch_queue_t dq = context;
struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
if (_dispatch_thread_getspecific(dispatch_queue_key)) {
DISPATCH_CRASH("Premature thread recycling");
}
// 把dq设置为刚启动的这个线程的TSD
_dispatch_thread_setspecific(dispatch_queue_key, dq);
qc->dgq_pending = 0;
// _dispatch_queue_concurrent_drain_one用来取出队列的一个内容
while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
// 用来对取出的内容进行处理(如果是任务,则执行任务)
_dispatch_continuation_pop(item);
}
_dispatch_thread_setspecific(dispatch_queue_key, NULL);
_dispatch_force_cache_cleanup();
}
- 1、一个是
_dispatch_queue_concurrent_drain_one
,用来取出队列的一个内容; - 2、另一个是
_dispatch_continuation_pop
函数,用来对取出的内容进行处理;
dispatch_async总结
dispatch_async 的实现比较复杂,主要是因为其中的数据结构较多,分支流程控制比较复杂,但思路其实很简单。
首先将block
copy到队列里的链表的尾部,如果队列里没有任务,则唤醒队列,然后判断队列的类型。
-
1、如果是串行队列,则新开一条子线程,并循环取出链表中保存的block放入子线程中执行,然后子线程和主线程并发执行。
-
2、如果是主队列,则循环取出链表中保存的block,等待主线程中的任务都执行完,把链表中的任务block加入到主线程并执行。
-
3、如果是并发队列,则检测队列的workqueue是否为空。如果不为空,则XNU内核根据系统状态和队列的优先级来决定是否生产新线程,如果无需生成新线程,则调用系统函数请求一条线程,并让线程池数-1。如果为空,则直接创建一个Pthread线程并启动,最后取出链表头指针的内容,内容可能是队列、任务block、group。如果是任务block,则执行。
block执行完后,会等待一个信号量,时间为5秒。如果5秒内没有接收到新任务,则退出并销毁这个线程。
2.dispatch_sync
同步方法的实现相对来说和异步类似,而且更简单,只需要将任务压入响应的队列,并用信号量做等待,调用栈如下:
dispatch_sync
└──dispatch_sync_f
└──_dispatch_sync_f2
└──_dispatch_sync_f_slow
dispatch_sync_f
void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
typeof(dq->dq_running) prev_cnt;
dispatch_queue_t old_dq;
if (dq->dq_width == 1) {
// 向一个串行队列中压进一个同步任务
return dispatch_barrier_sync_f(dq, ctxt, func);
}
// 向一个并发队列中压进一个同步任务
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
// 如果并发队列中存在其他任务或者队列被挂起,则直接进入_dispatch_sync_f_slow
// 函数,等待这个队列中的其他任务完成(信号量的方式),然后执行这个任务
_dispatch_sync_f_slow(dq);
}
else{
prev_cnt = dispatch_atomic_add(&dq->dq_running, 2) - 2;
if (slowpath(prev_cnt & 1)) {
if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
// 队列已经为空,也没有正在执行的任务,需要唤醒队列
_dispatch_wakeup(dq);
}
// 队列已经为空,但是有正在执行的任务
_dispatch_sync_f_slow(dq);
}
}
old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
func(ctxt);
_dispatch_workitem_inc();
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
if (slowpath(dispatch_atomic_sub(&dq->dq_running, 2) == 0)) {
_dispatch_wakeup(dq);
}
}
static void _dispatch_sync_f_slow(dispatch_queue_t dq)
{
//直接调用_dispatch_wakeup唤醒队列执行任务
// 信号等待保证同步
dispatch_semaphore_wait(dss.dc_ctxt, DISPATCH_TIME_FOREVER);
_dispatch_put_thread_semaphore(dss.dc_ctxt);
}
dispatch_sync总结
原理:将block压入相对应的队列,并且任务之间用信号量做等待
-
如果是自己create的串行队列或者是并发队列,则不开辟子线程,在主线程中直接执行,无需等待主线程中的任务都执行完。
-
如果是主队列,则主队列的任务要等主线程的任务执行完,而主线程的任务也要等主队列的block任务执行完,最后两者相互等待而卡死。