一口一口吃掉GCD (中)

从源码层面分析GCD的底层实现原理

Posted by Marco Liu on July 11, 2017

“Yeah I’m here’. ”

正文

dispatch_once


typedef long dispatch_once_t;


void dispatch_once(dispatch_once_t *val, void (^block)(void)){
    struct Block_basic *bb = (void *)block;
    dispatch_once_f(val, block, (void *)bb->Block_invoke);
}

void dispatch_once_f(dispatch_once_t *val, void *ctxt, void (*func)(void *)){
    
    volatile long *vval = val;
    if (dispatch_atomic_cmpxchg(val, 0l, 1l)) {
        func(ctxt); // block真正执行
        dispatch_atomic_barrier();
        *val = ~0l;
    } 
    else 
    {
        do
        {
            _dispatch_hardware_pause();
        } while (*vval != ~0l);
        dispatch_atomic_barrier();
    }
}

可以看到dispatch_once实际上调用了dispatch_once_f

在多线程环境中,如果某一个线程A首次进入dispatch_once_fval==0,这个时候直接将其原子操作设为1,然后执行传入dispatch_once_f的block,然后调用dispatch_atomic_barrier,最后将val的值修改为~0。

dispatch_atomic_barrier是一种内存屏障,因为处理器为了加快处理速度,可能会乱序执行,在这里插入一个内存屏障,就相当于告诉编译器,屏障前后的指令顺序不能颠倒,告诉处理器,只有等屏障前的指令执行完了,屏障后的指令才能开始执行。所以这里dispatch_atomic_barrier能保证只有在block执行完毕后才能修改*val的值。


常用API解析

1.dispatch_async

直接上函数实现:

dispatch_async(dispatch_queue_t queue, dispatch_block_t block) {
    dispatch_async_f(dq, _dispatch_Block_copy(work), _dispatch_call_block_and_release);    
}

block 参数的类型是 dispatch_block_t,它是一个没有参数,没有返回值的 block:

typedef void (^dispatch_block_t)(void);

dispatch_async主要将参数进行了处理,它将 block 复制了一份,然后去调用dispatch_async_f

  • 1、_dispatch_Block_copy在堆上创建传入block的拷贝,或者增加引用计数,这样就保证了block在执行之前不会被销毁
  • 2、_dispatch_call_block_and_release的定义如下,顾名思义,调用block,然后将block销毁

dispatch_async_f

省略各种分支后的 dispatch_async_f 函数实现如下:

//func 参数是一个函数,在实际调用时,会把第二参数 ctxt 作为参数传入
void dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) {
    dispatch_continuation_t dc;
	if (dq->dq_width == 1) {
		return dispatch_barrier_async_f(dq, ctxt, func);
	}
	dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
	dc->dc_func = func;
	dc->dc_ctxt = ctxt;
	if (dq->do_targetq) {
		return _dispatch_async_f2(dq, dc);
	}
	_dispatch_queue_push(dq, dc);
}

可见如果是串行队列 (dq_width = 1),会调用 dispatch_barrier_async_f 函数处理,这个后文会有介绍,如果有 do_targetq 则进行转发。

如果是主队列,则直接调用_dispatch_queue_wakeup_main

如果是全局队列,调用 _dispatch_queue_push 入队。

这里的 dispatch_continuation_t 其实是对 block 的封装,然后调用 _dispatch_queue_push 这个宏将封装好的 block 放入队列中。

把这个宏展开,然后依次分析调用栈,选择一条主干调用线,结果如下:

_dispatch_queue_push
└──_dispatch_trace_queue_push
    └──_dispatch_queue_push
        └──_dispatch_queue_push_slow
            └──_dispatch_queue_push_list_slow2
                └──_dispatch_wakeup
                    └──dx_probe

队列中保存了一个链表,我们首先将新的 block 添加到链表尾部,然后调用 dx_probe 宏,它依赖于 vtable 数据结构,GCD 中的大部分对象,比如队列等,都具有这个数据结构。它定义了对象在不同操作下该执行的方法,比如在这里的 probe 操作下,实际上会执行 _dispatch_queue_wakeup_global 方法,调用栈如下

_dispatch_queue_wakeup_global
└──_dispatch_queue_wakeup_global2
    └──_dispatch_queue_wakeup_global_slow

_dispatch_queue_wakeup_global_slow

_dispatch_queue_wakeup_global_slow 我们见到了熟悉的老朋友,pthread 线程:


	// 如果线程池已满,则直接调用 _dispatch_worker_thread 
    // 否则创建线程池
static void _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) {

     static dispatch_once_t pred;
    struct dispatch_root_queue_context_s *qc = dq->do_ctxt;
    pthread_workitem_handle_t wh;
    unsigned int gen_cnt;
    pthread_t pthr;
    int r, t_count;

    if (!dq->dq_items_tail) {
        return false;
    }

    _dispatch_safe_fork = false;

    dispatch_debug_queue(dq, __PRETTY_FUNCTION__);

    // 全局队列的检测,初始化和配置环境(只调用1次)
    dispatch_once_f(&pred, NULL, _dispatch_root_queues_init);

    // 如果队列的dgq_kworkqueue不为空,则从
    if (qc->dgq_kworkqueue) {
        if (dispatch_atomic_cmpxchg(&qc->dgq_pending, 0, 1)) {
            _dispatch_debug("requesting new worker thread");

            r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, _dispatch_worker_thread2, dq, &wh, &gen_cnt);
            dispatch_assume_zero(r);
        } else {
            _dispatch_debug("work thread request still pending on global queue: %p", dq);
        }
        goto out;
    }

    // 发送一个信号量,这是一种线程保活的方法
    if (dispatch_semaphore_signal(qc->dgq_thread_mediator)) {
        goto out;
    }

    // 检测线程池可用的大小,如果还有,则线程池减1
    do {
        t_count = qc->dgq_thread_pool_size;
        if (!t_count) {
            _dispatch_debug("The thread pool is full: %p", dq);
            goto out;
        }
    } while (!dispatch_atomic_cmpxchg(&qc->dgq_thread_pool_size, t_count, t_count - 1));

    // 使用pthread 库创建一个线程,线程的入口是_dispatch_worker_thread
    while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
        if (r != EAGAIN) {
            dispatch_assume_zero(r);
        }
        sleep(1);
    }

    // 调用pthread_detach,主线程与子线程分离,子线程结束后,资源自动回收
    r = pthread_detach(pthr);
    dispatch_assume_zero(r);

out:
    return false;
}

由此可见这里确实使用了线程池。创建线程后会执行 _dispatch_worker_thread 回调:


_dispatch_worker_thread2
void _dispatch_worker_thread2(void *context)
{
    struct dispatch_object_s *item;
    dispatch_queue_t dq = context;
    struct dispatch_root_queue_context_s *qc = dq->do_ctxt;

    if (_dispatch_thread_getspecific(dispatch_queue_key)) {
        DISPATCH_CRASH("Premature thread recycling");
    }

    // 把dq设置为刚启动的这个线程的TSD
    _dispatch_thread_setspecific(dispatch_queue_key, dq);
    qc->dgq_pending = 0;


    // _dispatch_queue_concurrent_drain_one用来取出队列的一个内容
    while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) {
        // 用来对取出的内容进行处理(如果是任务,则执行任务)
        _dispatch_continuation_pop(item);
    }

    _dispatch_thread_setspecific(dispatch_queue_key, NULL);

    _dispatch_force_cache_cleanup();
}
  • 1、一个是_dispatch_queue_concurrent_drain_one,用来取出队列的一个内容;
  • 2、另一个是_dispatch_continuation_pop函数,用来对取出的内容进行处理;

dispatch_async总结

dispatch_async 的实现比较复杂,主要是因为其中的数据结构较多,分支流程控制比较复杂,但思路其实很简单。

首先将block copy到队列里的链表的尾部,如果队列里没有任务,则唤醒队列,然后判断队列的类型。

  • 1、如果是串行队列,则新开一条子线程,并循环取出链表中保存的block放入子线程中执行,然后子线程和主线程并发执行。

  • 2、如果是主队列,则循环取出链表中保存的block,等待主线程中的任务都执行完,把链表中的任务block加入到主线程并执行。

  • 3、如果是并发队列,则检测队列的workqueue是否为空。如果不为空,则XNU内核根据系统状态和队列的优先级来决定是否生产新线程,如果无需生成新线程,则调用系统函数请求一条线程,并让线程池数-1。如果为空,则直接创建一个Pthread线程并启动,最后取出链表头指针的内容,内容可能是队列、任务block、group。如果是任务block,则执行。
    block执行完后,会等待一个信号量,时间为5秒。如果5秒内没有接收到新任务,则退出并销毁这个线程。


2.dispatch_sync

同步方法的实现相对来说和异步类似,而且更简单,只需要将任务压入响应的队列,并用信号量做等待,调用栈如下:

dispatch_sync
└──dispatch_sync_f
    └──_dispatch_sync_f2
        └──_dispatch_sync_f_slow

dispatch_sync_f
void dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
    typeof(dq->dq_running) prev_cnt;
    dispatch_queue_t old_dq;

    if (dq->dq_width == 1) {
        // 向一个串行队列中压进一个同步任务
        return dispatch_barrier_sync_f(dq, ctxt, func);
    }
    
    // 向一个并发队列中压进一个同步任务
    if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){    
        // 如果并发队列中存在其他任务或者队列被挂起,则直接进入_dispatch_sync_f_slow 
        // 函数,等待这个队列中的其他任务完成(信号量的方式),然后执行这个任务
        _dispatch_sync_f_slow(dq);
    }
    else{            
        prev_cnt = dispatch_atomic_add(&dq->dq_running, 2) - 2;

        if (slowpath(prev_cnt & 1)) {
                
            if (dispatch_atomic_sub(&dq->dq_running, 2) == 0) {
                // 队列已经为空,也没有正在执行的任务,需要唤醒队列
                _dispatch_wakeup(dq);
            }            
            // 队列已经为空,但是有正在执行的任务
            _dispatch_sync_f_slow(dq);
        }
    }

    old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
    _dispatch_thread_setspecific(dispatch_queue_key, dq);
    func(ctxt);
    _dispatch_workitem_inc();
    _dispatch_thread_setspecific(dispatch_queue_key, old_dq);

    if (slowpath(dispatch_atomic_sub(&dq->dq_running, 2) == 0)) {
        _dispatch_wakeup(dq);
    }
}

static void _dispatch_sync_f_slow(dispatch_queue_t dq)
{
	//直接调用_dispatch_wakeup唤醒队列执行任务
	// 信号等待保证同步
    dispatch_semaphore_wait(dss.dc_ctxt, DISPATCH_TIME_FOREVER);
    _dispatch_put_thread_semaphore(dss.dc_ctxt);
}

dispatch_sync总结

原理:将block压入相对应的队列,并且任务之间用信号量做等待

  • 如果是自己create的串行队列或者是并发队列,则不开辟子线程,在主线程中直接执行,无需等待主线程中的任务都执行完。

  • 如果是主队列,则主队列的任务要等主线程的任务执行完,而主线程的任务也要等主队列的block任务执行完,最后两者相互等待而卡死。