一口一口吃掉GCD (下)

从源码层面分析GCD的底层实现原理

Posted by Marco Liu on July 12, 2017

“Yeah I’m here’. ”

正文

dispatch_semaphore

dispatch_semaphore_s是性能稍次于自旋锁的的信号量对象,用来保证资源使用的安全性。

它主要有三个API,分别是dispatch_semaphore_createdispatch_semaphore_waitdispatch_semaphore_signal

信号量在初始化时要指定 value,随后内部将这个 value 存储起来。实际操作时会存两个 value,一个是当前的 value,一个是记录初始 value。

信号的 waitsignal 是互逆的两个操作。如果 value 大于 0,前者将 value 减一,此时如果 value 小于零就一直等待。

初始 value 必须大于等于 0,如果为 0 并随后调用 wait 方法,线程将被阻塞直到别的线程调用了 signal 方法。

DISPATCH_DECL(dispatch_semaphore);

#define DISPATCH_DECL(name) typedef struct name##_s *name##_t;

struct dispatch_semaphore_s {
    DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s);
    long dsema_value;   // 当前信号值,当这个值小于0时无法访问加锁资源
    long dsema_orig;    // 初始化信号值,限制了同时访问资源的线程数量
    
    size_t dsema_sent_ksignals; //由于mach信号可能会被意外唤醒,通过原子操作来避免虚假信号
    
    semaphore_t dsema_port;
    semaphore_t dsema_waiter_port;
    size_t dsema_group_waiters;
    struct dispatch_sema_notify_s *dsema_notify_head;
    struct dispatch_sema_notify_s *dsema_notify_tail;
};

dispatch_semaphore_create

创建一个具有初始值的计数信号量。如果有两个线程共同完成某一项工作时,可以使用一个初始值为0的semaphore。如果你需要管理一个有限的资源池时,应该使用一个初始值为资源池的大小的semaphore。如果不再使用这个semaphore时,应该使用dispatch_release销毁semaphore对象并且释放他的内存。(在ARC模式下不需要,系统会自动释放)

dispatch_semaphore_t dispatch_semaphore_create(long value){
    dispatch_semaphore_t dsema;

    if (value < 0) {
        return NULL;
    }
    // 申请内存
    dsema = calloc(1, sizeof(struct dispatch_semaphore_s));
    
    if (fastpath(dsema)) {
        // do_vtable里面主要包含了这个 dispatch_semaphore_s 的操作函数
        dsema->do_vtable = &_dispatch_semaphore_vtable;

        //可以理解为链表的结尾标记
        dsema->do_next = DISPATCH_OBJECT_LISTLESS;

        // 引用计数
        dsema->do_ref_cnt = 1;
        dsema->do_xref_cnt = 1;

        // 目标队列
        dsema->do_targetq = dispatch_get_global_queue(0, 0);

        // 信号值
        dsema->dsema_value = value;
        dsema->dsema_orig = value;
    }
    
    return dsema;
}

dispatch_semaphore_wait

long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
	long value = dispatch_atomic_dec2o(dsema, dsema_value);
	dispatch_atomic_acquire_barrier();
	if (fastpath(value >= 0)) {
		return 0;
	}
	return _dispatch_semaphore_wait_slow(dsema, timeout);
}

第一行的 dispatch_atomic_dec2o 是一个宏,会调用 GCC 内置的函数 __sync_sub_and_fetch,实现减法的原子性操作。因此这一行的意思是将 dsema 的值减一,并把新的值赋给 value。

如果减一后的 value 大于等于 0 就立刻返回,没有任何操作,否则进入等待状态,阻塞线程,等待FIFO中的信号量的到来。

_dispatch_semaphore_wait_slow 函数针对不同的 timeout 参数,分了三种情况考虑:

case DISPATCH_TIME_NOW:
	while ((orig = dsema->dsema_value) < 0) {
		if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) {
			return KERN_OPERATION_TIMED_OUT;
		}
	}

这种情况下会立刻判断 dsema->dsema_valueorig 是否相等。如果 while 判断成立,内部的 if 判断一定也成立,此时会将 value 加一(也就是变为 0) 并返回。加一的原因是为了抵消 wait 函数一开始的减一操作。此时函数调用方会得到返回值 KERN_OPERATION_TIMED_OUT,表示由于等待时间超时而返回。

实际上 while 判断一定会成立,因为如果 value 大于等于 0,在上一个函数 dispatch_semaphore_wait 中就已经返回了。

第二种情况是 DISPATCH_TIME_FOREVER 这个 case:

case DISPATCH_TIME_FOREVER:
	do {
		kr = semaphore_wait(dsema->dsema_port);
	} while (kr == KERN_ABORTED);
	break;

进入 do-while 循环后会调用系统的 semaphore_wait 方法,KERN_ABORTED 表示调用者被一个与信号量系统无关的原因唤醒。因此一旦发生这种情况,还是要继续等待,直到收到 signal 调用。

在其他情况下(default 分支),我们指定一个超时时间,这和 DISPATCH_TIME_FOREVER 的处理比较类似,不同的是我们调用了内核提供的 semaphore_timedwait 方法可以指定超时时间。

整个函数的框架如下:

static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
again:
    while ((orig = dsema->dsema_sent_ksignals)) {
		if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,
				orig - 1)) {
			return 0;
		}
	}
	switch (timeout) {
	    default:  /* semaphore_timedwait */
	    case DISPATCH_TIME_NOW: /* KERN_OPERATION_TIMED_OUT */
	    case DISPATCH_TIME_FOREVER: /* semaphore_wait */
	}
	goto again;
}

可见信号量被唤醒后,会回到最开始的地方,进入 while 循环。这个判断条件一般都会成立,极端情况下由于内核存在 bug,导致 origdsema_sent_ksignals 不相等,也就是收到虚假 signal 信号时会忽略。

进入 while 循环后,if 判断一定成立,因此返回 0,正如文档所说,返回 0 表示成功,否则表示超时。


dispatch_semaphore_signal

这个函数的实现相对来说比较简单,因为它不需要阻塞,只用唤醒。简化版源码如下:

long dispatch_semaphore_signal(dispatch_semaphore_t dsema) {
	long value = dispatch_atomic_inc2o(dsema, dsema_value);
	if (fastpath(value > 0)) {
		return 0;
	}
	return _dispatch_semaphore_signal_slow(dsema);
}

首先会调用原子方法让 value 加一,如果大于零就立刻返回 0,否则返回 _dispatch_semaphore_signal_slow:

long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) {
	(void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals);
    _dispatch_semaphore_create_port(&dsema->dsema_port);
	kern_return_t kr = semaphore_signal(dsema->dsema_port);
	return 1;
}

它的作用仅仅是调用内核的 semaphore_signal 函数唤醒信号量,然后返回 1。这也符合文档中的描述:“如果唤醒了线程,返回非 0,否则返回 0”。


dispatch_group

这篇文章主要分析一下dispatch_group的底层实现,主要是包括几个重要的函数: dispatch_group_createdispatch_group_enterdispatch_group_leavedispatch_group_waitdispatch_group_notifydispatch_group_async


dispatch_group_create

dispatch_group_t dispatch_group_create(void) {
	dispatch_group_t dg = _dispatch_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_semaphore_s));
	_dispatch_semaphore_init(LONG_MAX, dg);
	return dg;
}

没错,group 就是一个 value 为 LONG_MAX 的信号量。


dispatch_group_enter

void dispatch_group_enter(dispatch_group_t dg) {
	dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
	(void)dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
}

这个方法也没做什么,就是调用 wait 方法让信号量的 value 减一而已。


dispatch_group_leave

在介绍 dispatch_async 函数时,我们看到任务在被执行时,还会调用 dispatch_group_leave 函数:

void dispatch_group_leave(dispatch_group_t dg) {
	dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
	long value = dispatch_atomic_inc2o(dsema, dsema_value);
	if (slowpath(value == dsema->dsema_orig)) {
		(void)_dispatch_group_wake(dsema);
	}
}

当 group 的 value 变为初始值时,表示所有任务都已执行完,开始调用 _dispatch_group_wake 处理回调。

_dispatch_group_wake
static long _dispatch_group_wake(dispatch_semaphore_t dsema) {
	struct dispatch_sema_notify_s *next, *head, *tail = NULL;
	long rval;
	head = dispatch_atomic_xchg2o(dsema, dsema_notify_head, NULL);
	
	if (head) {
		tail = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, NULL);
	}
	rval = dispatch_atomic_xchg2o(dsema, dsema_group_waiters, 0);
	if (rval) {
		_dispatch_semaphore_create_port(&dsema->dsema_waiter_port);
		do {
			kern_return_t kr = semaphore_signal(dsema->dsema_waiter_port);
		} while (--rval);
	}
	if (head) {
		// async group notify blocks
		do {
			dispatch_async_f(head->dsn_queue, head->dsn_ctxt, head->dsn_func);
			next = fastpath(head->dsn_next);
			if (!next && head != tail) {
				while (!(next = fastpath(head->dsn_next))) {
					_dispatch_hardware_pause();
				}
			}
			free(head);
		} while ((head = next));
	}
	return 0;
}

这个函数主要分为两部分,首先循环调用 semaphore_signal 告知唤醒当初等待 group 的信号量,因此 dispatch_group_wait 函数得以返回。

然后获取链表,依次调用 dispatch_async_f 异步执行在 notify 函数中注册的回调。


dispatch_group_wait

这个方法用于等待 group 中所有任务执行完成,可以理解为信号量 wait 的封装:

long dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) {
	dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
	if (dsema->dsema_value == dsema->dsema_orig) {
		return 0;
	}
	if (timeout == 0) {
		return KERN_OPERATION_TIMED_OUT;
	}
	return _dispatch_group_wait_slow(dsema, timeout);
}

如果当前 value 和原始 value 相同,表明任务已经全部完成,直接返回 0,如果 timeout 为 0 也会立刻返回,否则调用 _dispatch_group_wait_slow。这个方法等等待部分和 _dispatch_semaphore_signal_slow 几乎一致,区别在于等待结束后它不是 return,而是调用 _dispatch_group_wake 去唤醒这个 group。

static long _dispatch_group_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) {
again:
    _dispatch_group_wake(dsema);
    switch (timeout) {/* 三种情况分类 */}    
    goto again;
}

这里我们暂时跳过 _dispatch_group_wake,后面会有详细分析。只要知道这个函数在 group 中所有事件执行完后会被调用即可。


dispatch_group_notify

老习惯,这个函数仅仅是封装了 dispatch_group_notify_f:

void dispatch_group_notify_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, void (*func)(void *)) {
	dispatch_semaphore_t dsema = (dispatch_semaphore_t)dg;
	struct dispatch_sema_notify_s *dsn, *prev;

	dsn->dsn_queue = dq;
	dsn->dsn_ctxt = ctxt;
	dsn->dsn_func = func;
	prev = dispatch_atomic_xchg2o(dsema, dsema_notify_tail, dsn);
	if (fastpath(prev)) {
		prev->dsn_next = dsn;
	} else {/* ... */}
}

这种结构的代码我们已经遇到多次了,它其实就是在链表的尾部续上新的元素。所以 notify 方法并没有做过多的处理,只是是用链表把所有回调通知保存起来,等待调用。


dispatch_group_async

其实就是封装了 dispatch_async_f ,在 dispatch_async_f 内部调用了dispatch_group_enter 方法。

鸣谢

  1. Why do we use __builtin_expect when a straightforward way is to use if-else
  2. Posix线程编程指南(2) 线程私有数据
  3. 选择 GCD 还是 NSTimer?
  4. 从NSTimer的失效性谈起(二):关于GCD Timer和libdispatch
  5. 变态的libDispatch源码分析
  6. bestswifter–深入理解GCD
  7. 凌云的博客–GCD源码分析