这几天研究了一下多核平台的同步机制的性能测试。主要是对比了自旋CAS锁和互斥锁的性能差距。
自旋锁是一种乐观锁,它认为共享的变量被其他线程占据时不会占据太长的时间,因此,自旋锁会在死循环里不断检查锁状态,而不是像互斥锁一样将当前任务挂起(互斥锁是一种悲观锁)。通常来讲,切换进程需要消耗的资源要大于在while里多循环几次,因此从理论上来讲,自旋锁对于共享资源不会被长时间占据的情况下,性能会优于互斥锁。
自旋锁的一种实现方式是CAS(compare and swap)操作,CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。GNU提供了现成的函数__sync_bool_compare_and_swap
。当然,C++、Windows、Java等也提供了各自的类似函数。)
CAS方式设计的自旋锁的优点在于十分轻量,不断的while循环检查即可,但也有一个致命缺点,就是ABA问题,如果在下一次检查前,原来的数据A被换为B又换为A后,便会错误的认为检查结果和预期值一致,实际上数据发生过了更改。对于无锁链表很容易遇到这类问题,解决办法可以是添加“版本号”进行额外检查,任何操作都会使版本号加一,这样即使还是A,但是版本号不同了。
对于互斥锁,GNU的线程相关的类也提供了类似的现成操作:pthread_mutex_lock
和pthread_mutex_unlock
。在资源被当前线程上锁后,其他线程访问该资源时会获取锁失败,并被挂起。
影响同步性能的除了锁的复杂性以外,还有一个重要因素就是CACHE命中。CACHE是CPU和内存之间数据交换的一个高速中间缓冲。当CPU需要数据的时候,数据会首先从内存拷贝到CACHE中,CPU中CACHE中取数据。为了效率,通常会把所需数据周围的内存一块拷贝到CACHE中。举例,假设CPU需要变量A,此时包含变量的A的内存块被拷贝到CACHE,CPU用完变量A后接下来需要变量B,如果变量B正好挨着变量A存放,那么CPU可以直接从CACHE中取出变量B,这叫做缓存命中,程序的执行效率就会比较高。如果不幸,变量B的位置和变量A间隔太远,超过了CACHE的大小,那么CPU不得不发出请求,从二级缓存、三级缓存甚至内存中,去寻找变量B,把变量B复制到CACHE中来,这就是缓存MISS。两种情况一比较,很明显,AB相邻的情况程序执行效率高,这就是缓存友好型代码。一个很经典的例子就是给二维数组赋值,当数组大小超过CACHE大小后,按行赋值的效率会比按列赋值的速度要快,就是因为,所取的一行数据,总是相邻的,因此发生miss的概率就要低。
下面是同步机制的测试代码和测试结果。测试思路就是多个线程对同一个矩阵进行翻转,因此需要上锁。如果要测试缓存miss情况下的同步效率则不做真实翻转,只对数组前几个元素做简单的赋值操作,使总操作次数与真实翻转所需要的次数一致即可。
#ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <stdio.h> #include <pthread.h> #include <unistd.h> #include <stdlib.h> #include <sched.h> #include <linux/unistd.h> #include <sys/syscall.h> #include <errno.h> #include <linux/types.h> #include <sys/time.h> #include <stdbool.h> #define INC_TO 10000 // 累加次数 #define USE_MUTEX 0 //使用mutex加锁测试还是使用CAS测试 #define CACHE_MISS 0 //是否miss缓存 int global_int = 0; pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER; //矩阵翻转 #define MATSIZE 128 int mat[MATSIZE][MATSIZE]; void transpose() { for (int i = 0; i < MATSIZE; i++) for (int j = 0; j < MATSIZE; j++) { #if CACHE_MISS //真的翻转矩阵 int aux = mat[i][j]; mat[i][j] = mat[j][i]; mat[j][i] = aux; #else //假的翻转矩阵,随便做些无意义操作,保证操作次数相同 int aux = mat[0][0]; mat[0][2] = mat[0][1]; mat[0][3] = aux; #endif } } //获取当前CPU时间 __u64 rdtsc() { __u32 lo, hi; __asm__ __volatile__( "rdtsc" : "=a"(lo), "=d"(hi)); return (__u64)hi << 32 | lo; } //获取线程ID pid_t gettid( void ) { return syscall( __NR_gettid ); } //CAS无锁操作 int mutex = 0; int lock = 0; int unlock = 1; void *thread_routine_cas( void *arg ) { int i; int proc_num = (int)(long)arg; __u64 begin, end; struct timeval tv_begin,tv_end; __u64 timeinterval; cpu_set_t set; //线程绑定CPU CPU_ZERO( &set ); CPU_SET( proc_num, &set ); if (sched_setaffinity( gettid(), sizeof( cpu_set_t ), &set )) { perror( "sched_setaffinity" ); return NULL; } //计时 begin = rdtsc(); gettimeofday(&tv_begin,NULL); for (int i = 0; i < INC_TO; i++) { while (!__sync_bool_compare_and_swap(&mutex, lock, 1)) usleep(100); //为啥不加延时反而cache miss的时间要低? // {} global_int++; transpose(); __sync_bool_compare_and_swap(&mutex, unlock, 0); } gettimeofday(&tv_end,NULL); end = rdtsc(); timeinterval =(tv_end.tv_sec - tv_begin.tv_sec)*1000000 +(tv_end.tv_usec - tv_begin.tv_usec); //输出 fprintf(stderr, "处理核心 %d 的 __sync_bool_compare_and_swap 操作花费 %llu CPU cycle,花费 %llu us\n", proc_num, end - begin, timeinterval); return NULL; } //使用互斥pthread_mutex_lock的的加锁操作 void *thread_routine_mutex( void *arg ) { int i; int proc_num = (int)(long)arg; __u64 begin, end; struct timeval tv_begin,tv_end; __u64 timeinterval; cpu_set_t set; //线程绑定CPU CPU_ZERO( &set ); CPU_SET( proc_num, &set ); if (sched_setaffinity( gettid(), sizeof( cpu_set_t ), &set )) { perror( "sched_setaffinity" ); return NULL; } //计时 begin = rdtsc(); gettimeofday(&tv_begin,NULL); for (int i = 0; i < INC_TO; i++){ pthread_mutex_lock(&count_lock); global_int++; transpose(); pthread_mutex_unlock(&count_lock); } gettimeofday(&tv_end,NULL); end = rdtsc(); timeinterval =(tv_end.tv_sec - tv_begin.tv_sec)*1000000 +(tv_end.tv_usec - tv_begin.tv_usec); //输出 fprintf(stderr, "处理核心 %d 的 pthread lock 操作花费 %llu CPU cycle,花费 %llu us\n", proc_num, end - begin, timeinterval); return NULL; } int main() { int procs = 0; pthread_t *thrs; // 获取CPU数量 procs = (int)sysconf( _SC_NPROCESSORS_ONLN ); if (procs < 0) { perror( "sysconf" ); return -1; } // 只使用其中2个核心 if (procs>2) { procs = 2; } //按CPU数量为线程申请内存 thrs = (pthread_t* )malloc(sizeof(pthread_t) * procs); if (thrs == NULL) { perror( "malloc" ); return -1; } //CACHE模式提示 #if CACHE_MISS printf("CACHE MISS TEST\n"); #else printf("CACHE HIT TEST\n"); #endif //使用加锁还是CAS #if USE_MUTEX printf("Mutex Test\n"); //创建线程并执行 printf("Starting %d threads...\n", procs); for (int i = 0; i < procs; i++) { if (pthread_create( &thrs[i], NULL, thread_routine_mutex, (void *)(long)i )) { perror( "pthread_create" ); procs = i; break; } } #else printf("CAS Test\n"); //创建线程并执行 printf("Starting %d threads...\n", procs); for (int i = 0; i < procs; i++) { if (pthread_create(&thrs[i], NULL, thread_routine_cas, (void *)(long)i)) { perror("pthread_create"); procs = i; break; } } #endif //等待所有线程执行完毕后释放内存 for (int i = 0; i < procs; i++) { pthread_join(thrs[i], NULL); } free( thrs ); //打印输出结果 printf("实际累加结果为: %d\n", global_int ); printf("期望累加结果为: %d\n", procs * INC_TO); return 0; }
测试结果如下: