这几天研究了一下多核平台的同步机制的性能测试。主要是对比了自旋CAS锁和互斥锁的性能差距。
自旋锁是一种乐观锁,它认为共享的变量被其他线程占据时不会占据太长的时间,因此,自旋锁会在死循环里不断检查锁状态,而不是像互斥锁一样将当前任务挂起(互斥锁是一种悲观锁)。通常来讲,切换进程需要消耗的资源要大于在while里多循环几次,因此从理论上来讲,自旋锁对于共享资源不会被长时间占据的情况下,性能会优于互斥锁。
自旋锁的一种实现方式是CAS(compare and swap)操作,CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。GNU提供了现成的函数__sync_bool_compare_and_swap。当然,C++、Windows、Java等也提供了各自的类似函数。)
CAS方式设计的自旋锁的优点在于十分轻量,不断的while循环检查即可,但也有一个致命缺点,就是ABA问题,如果在下一次检查前,原来的数据A被换为B又换为A后,便会错误的认为检查结果和预期值一致,实际上数据发生过了更改。对于无锁链表很容易遇到这类问题,解决办法可以是添加“版本号”进行额外检查,任何操作都会使版本号加一,这样即使还是A,但是版本号不同了。
对于互斥锁,GNU的线程相关的类也提供了类似的现成操作:pthread_mutex_lock和pthread_mutex_unlock。在资源被当前线程上锁后,其他线程访问该资源时会获取锁失败,并被挂起。
影响同步性能的除了锁的复杂性以外,还有一个重要因素就是CACHE命中。CACHE是CPU和内存之间数据交换的一个高速中间缓冲。当CPU需要数据的时候,数据会首先从内存拷贝到CACHE中,CPU中CACHE中取数据。为了效率,通常会把所需数据周围的内存一块拷贝到CACHE中。举例,假设CPU需要变量A,此时包含变量的A的内存块被拷贝到CACHE,CPU用完变量A后接下来需要变量B,如果变量B正好挨着变量A存放,那么CPU可以直接从CACHE中取出变量B,这叫做缓存命中,程序的执行效率就会比较高。如果不幸,变量B的位置和变量A间隔太远,超过了CACHE的大小,那么CPU不得不发出请求,从二级缓存、三级缓存甚至内存中,去寻找变量B,把变量B复制到CACHE中来,这就是缓存MISS。两种情况一比较,很明显,AB相邻的情况程序执行效率高,这就是缓存友好型代码。一个很经典的例子就是给二维数组赋值,当数组大小超过CACHE大小后,按行赋值的效率会比按列赋值的速度要快,就是因为,所取的一行数据,总是相邻的,因此发生miss的概率就要低。
下面是同步机制的测试代码和测试结果。测试思路就是多个线程对同一个矩阵进行翻转,因此需要上锁。如果要测试缓存miss情况下的同步效率则不做真实翻转,只对数组前几个元素做简单的赋值操作,使总操作次数与真实翻转所需要的次数一致即可。
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <linux/unistd.h>
#include <sys/syscall.h>
#include <errno.h>
#include <linux/types.h>
#include <sys/time.h>
#include <stdbool.h>
#define INC_TO 10000 // 累加次数
#define USE_MUTEX 0 //使用mutex加锁测试还是使用CAS测试
#define CACHE_MISS 0 //是否miss缓存
int global_int = 0;
pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;
//矩阵翻转
#define MATSIZE 128
int mat[MATSIZE][MATSIZE];
void transpose()
{
for (int i = 0; i < MATSIZE; i++)
for (int j = 0; j < MATSIZE; j++)
{
#if CACHE_MISS
//真的翻转矩阵
int aux = mat[i][j];
mat[i][j] = mat[j][i];
mat[j][i] = aux;
#else
//假的翻转矩阵,随便做些无意义操作,保证操作次数相同
int aux = mat[0][0];
mat[0][2] = mat[0][1];
mat[0][3] = aux;
#endif
}
}
//获取当前CPU时间
__u64 rdtsc()
{
__u32 lo, hi;
__asm__ __volatile__(
"rdtsc"
: "=a"(lo), "=d"(hi));
return (__u64)hi << 32 | lo;
}
//获取线程ID
pid_t gettid( void )
{
return syscall( __NR_gettid );
}
//CAS无锁操作
int mutex = 0;
int lock = 0;
int unlock = 1;
void *thread_routine_cas( void *arg )
{
int i;
int proc_num = (int)(long)arg;
__u64 begin, end;
struct timeval tv_begin,tv_end;
__u64 timeinterval;
cpu_set_t set;
//线程绑定CPU
CPU_ZERO( &set );
CPU_SET( proc_num, &set );
if (sched_setaffinity( gettid(), sizeof( cpu_set_t ), &set ))
{
perror( "sched_setaffinity" );
return NULL;
}
//计时
begin = rdtsc();
gettimeofday(&tv_begin,NULL);
for (int i = 0; i < INC_TO; i++)
{
while (!__sync_bool_compare_and_swap(&mutex, lock, 1))
usleep(100); //为啥不加延时反而cache miss的时间要低?
// {}
global_int++;
transpose();
__sync_bool_compare_and_swap(&mutex, unlock, 0);
}
gettimeofday(&tv_end,NULL);
end = rdtsc();
timeinterval =(tv_end.tv_sec - tv_begin.tv_sec)*1000000 +(tv_end.tv_usec - tv_begin.tv_usec);
//输出
fprintf(stderr, "处理核心 %d 的 __sync_bool_compare_and_swap 操作花费 %llu CPU cycle,花费 %llu us\n",
proc_num, end - begin, timeinterval);
return NULL;
}
//使用互斥pthread_mutex_lock的的加锁操作
void *thread_routine_mutex( void *arg )
{
int i;
int proc_num = (int)(long)arg;
__u64 begin, end;
struct timeval tv_begin,tv_end;
__u64 timeinterval;
cpu_set_t set;
//线程绑定CPU
CPU_ZERO( &set );
CPU_SET( proc_num, &set );
if (sched_setaffinity( gettid(), sizeof( cpu_set_t ), &set ))
{
perror( "sched_setaffinity" );
return NULL;
}
//计时
begin = rdtsc();
gettimeofday(&tv_begin,NULL);
for (int i = 0; i < INC_TO; i++){
pthread_mutex_lock(&count_lock);
global_int++;
transpose();
pthread_mutex_unlock(&count_lock);
}
gettimeofday(&tv_end,NULL);
end = rdtsc();
timeinterval =(tv_end.tv_sec - tv_begin.tv_sec)*1000000 +(tv_end.tv_usec - tv_begin.tv_usec);
//输出
fprintf(stderr, "处理核心 %d 的 pthread lock 操作花费 %llu CPU cycle,花费 %llu us\n",
proc_num, end - begin, timeinterval);
return NULL;
}
int main()
{
int procs = 0;
pthread_t *thrs;
// 获取CPU数量
procs = (int)sysconf( _SC_NPROCESSORS_ONLN );
if (procs < 0)
{
perror( "sysconf" );
return -1;
}
// 只使用其中2个核心
if (procs>2)
{
procs = 2;
}
//按CPU数量为线程申请内存
thrs = (pthread_t* )malloc(sizeof(pthread_t) * procs);
if (thrs == NULL)
{
perror( "malloc" );
return -1;
}
//CACHE模式提示
#if CACHE_MISS
printf("CACHE MISS TEST\n");
#else
printf("CACHE HIT TEST\n");
#endif
//使用加锁还是CAS
#if USE_MUTEX
printf("Mutex Test\n");
//创建线程并执行
printf("Starting %d threads...\n", procs);
for (int i = 0; i < procs; i++)
{
if (pthread_create( &thrs[i], NULL, thread_routine_mutex,
(void *)(long)i ))
{
perror( "pthread_create" );
procs = i;
break;
}
}
#else
printf("CAS Test\n");
//创建线程并执行
printf("Starting %d threads...\n", procs);
for (int i = 0; i < procs; i++)
{
if (pthread_create(&thrs[i], NULL, thread_routine_cas,
(void *)(long)i))
{
perror("pthread_create");
procs = i;
break;
}
}
#endif
//等待所有线程执行完毕后释放内存
for (int i = 0; i < procs; i++)
{
pthread_join(thrs[i], NULL);
}
free( thrs );
//打印输出结果
printf("实际累加结果为: %d\n", global_int );
printf("期望累加结果为: %d\n", procs * INC_TO);
return 0;
}
测试结果如下:
