这几天研究了一下多核平台的同步机制的性能测试。主要是对比了自旋CAS锁和互斥锁的性能差距。

自旋锁是一种乐观锁,它认为共享的变量被其他线程占据时不会占据太长的时间,因此,自旋锁会在死循环里不断检查锁状态,而不是像互斥锁一样将当前任务挂起(互斥锁是一种悲观锁)。通常来讲,切换进程需要消耗的资源要大于在while里多循环几次,因此从理论上来讲,自旋锁对于共享资源不会被长时间占据的情况下,性能会优于互斥锁。

自旋锁的一种实现方式是CAS(compare and swap)操作,CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。GNU提供了现成的函数__sync_bool_compare_and_swap。当然,C++、Windows、Java等也提供了各自的类似函数。)

CAS方式设计的自旋锁的优点在于十分轻量,不断的while循环检查即可,但也有一个致命缺点,就是ABA问题,如果在下一次检查前,原来的数据A被换为B又换为A后,便会错误的认为检查结果和预期值一致,实际上数据发生过了更改。对于无锁链表很容易遇到这类问题,解决办法可以是添加“版本号”进行额外检查,任何操作都会使版本号加一,这样即使还是A,但是版本号不同了。

对于互斥锁,GNU的线程相关的类也提供了类似的现成操作:pthread_mutex_lockpthread_mutex_unlock。在资源被当前线程上锁后,其他线程访问该资源时会获取锁失败,并被挂起。

影响同步性能的除了锁的复杂性以外,还有一个重要因素就是CACHE命中。CACHE是CPU和内存之间数据交换的一个高速中间缓冲。当CPU需要数据的时候,数据会首先从内存拷贝到CACHE中,CPU中CACHE中取数据。为了效率,通常会把所需数据周围的内存一块拷贝到CACHE中。举例,假设CPU需要变量A,此时包含变量的A的内存块被拷贝到CACHE,CPU用完变量A后接下来需要变量B,如果变量B正好挨着变量A存放,那么CPU可以直接从CACHE中取出变量B,这叫做缓存命中,程序的执行效率就会比较高。如果不幸,变量B的位置和变量A间隔太远,超过了CACHE的大小,那么CPU不得不发出请求,从二级缓存、三级缓存甚至内存中,去寻找变量B,把变量B复制到CACHE中来,这就是缓存MISS。两种情况一比较,很明显,AB相邻的情况程序执行效率高,这就是缓存友好型代码。一个很经典的例子就是给二维数组赋值,当数组大小超过CACHE大小后,按行赋值的效率会比按列赋值的速度要快,就是因为,所取的一行数据,总是相邻的,因此发生miss的概率就要低。

下面是同步机制的测试代码和测试结果。测试思路就是多个线程对同一个矩阵进行翻转,因此需要上锁。如果要测试缓存miss情况下的同步效率则不做真实翻转,只对数组前几个元素做简单的赋值操作,使总操作次数与真实翻转所需要的次数一致即可。

#ifndef _GNU_SOURCE
    #define _GNU_SOURCE
#endif

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <sched.h>
#include <linux/unistd.h>
#include <sys/syscall.h>
#include <errno.h>
#include <linux/types.h>
#include <sys/time.h>
#include <stdbool.h>

#define INC_TO          10000        // 累加次数
#define USE_MUTEX       0           //使用mutex加锁测试还是使用CAS测试
#define CACHE_MISS      0           //是否miss缓存

int global_int = 0;
pthread_mutex_t count_lock = PTHREAD_MUTEX_INITIALIZER;

//矩阵翻转
#define MATSIZE 128
int mat[MATSIZE][MATSIZE];
void transpose()
{
    for (int i = 0; i < MATSIZE; i++)
        for (int j = 0; j < MATSIZE; j++)
        {
#if CACHE_MISS
            //真的翻转矩阵
            int aux = mat[i][j];
            mat[i][j] = mat[j][i];
            mat[j][i] = aux;
#else
            //假的翻转矩阵,随便做些无意义操作,保证操作次数相同
            int aux = mat[0][0];
            mat[0][2] = mat[0][1];
            mat[0][3] = aux;
#endif
        }
}

//获取当前CPU时间
__u64 rdtsc()
{
    __u32 lo, hi;

    __asm__ __volatile__(
        "rdtsc"
        : "=a"(lo), "=d"(hi));

    return (__u64)hi << 32 | lo;
}

//获取线程ID
pid_t gettid( void )
{
    return syscall( __NR_gettid );
}

//CAS无锁操作
int mutex = 0;
int lock = 0;
int unlock = 1;
void *thread_routine_cas( void *arg )
{
    int i;
    int proc_num = (int)(long)arg;
    __u64 begin, end;
    struct timeval tv_begin,tv_end;
    __u64 timeinterval;
    cpu_set_t set;

    //线程绑定CPU
    CPU_ZERO( &set );
    CPU_SET( proc_num, &set );
    if (sched_setaffinity( gettid(), sizeof( cpu_set_t ), &set ))
    {
        perror( "sched_setaffinity" );
        return NULL;
    }

    //计时
    begin = rdtsc();
    gettimeofday(&tv_begin,NULL);

    for (int i = 0; i < INC_TO; i++)
    {
        while (!__sync_bool_compare_and_swap(&mutex, lock, 1))
            usleep(100); //为啥不加延时反而cache miss的时间要低?
        // {}
        global_int++; 
        transpose();
        __sync_bool_compare_and_swap(&mutex, unlock, 0);
    }

    gettimeofday(&tv_end,NULL);
    end = rdtsc();
    timeinterval =(tv_end.tv_sec - tv_begin.tv_sec)*1000000 +(tv_end.tv_usec - tv_begin.tv_usec);
    
    //输出
    fprintf(stderr, "处理核心 %d 的 __sync_bool_compare_and_swap 操作花费 %llu CPU cycle,花费 %llu us\n",
            proc_num, end - begin, timeinterval);

    return NULL;
}

//使用互斥pthread_mutex_lock的的加锁操作
void *thread_routine_mutex( void *arg )
{
    int i;
    int proc_num = (int)(long)arg;
    __u64 begin, end;

    struct timeval tv_begin,tv_end;
    __u64 timeinterval;
    cpu_set_t set;

    //线程绑定CPU
    CPU_ZERO( &set );
    CPU_SET( proc_num, &set );
    if (sched_setaffinity( gettid(), sizeof( cpu_set_t ), &set ))
    {
        perror( "sched_setaffinity" );
        return NULL;
    }

    //计时
    begin = rdtsc();
    gettimeofday(&tv_begin,NULL);

    for (int i = 0; i < INC_TO; i++){
        pthread_mutex_lock(&count_lock);
        global_int++;
        transpose();
        pthread_mutex_unlock(&count_lock);
    }

    gettimeofday(&tv_end,NULL);
    end = rdtsc();
    timeinterval =(tv_end.tv_sec - tv_begin.tv_sec)*1000000 +(tv_end.tv_usec - tv_begin.tv_usec);
    
    //输出
    fprintf(stderr, "处理核心 %d 的 pthread lock 操作花费 %llu CPU cycle,花费 %llu us\n",
            proc_num, end - begin, timeinterval);

    return NULL;
}

int main()
{
    int procs = 0;
    pthread_t *thrs;

    // 获取CPU数量
    procs = (int)sysconf( _SC_NPROCESSORS_ONLN );
    if (procs < 0)
    {
        perror( "sysconf" );
        return -1;
    }
    // 只使用其中2个核心
    if (procs>2)
    {
        procs = 2;
    }

    //按CPU数量为线程申请内存
    thrs = (pthread_t* )malloc(sizeof(pthread_t) * procs);
    if (thrs == NULL)
    {
        perror( "malloc" );
        return -1;
    }

    //CACHE模式提示
#if CACHE_MISS     
    printf("CACHE MISS TEST\n");
#else
    printf("CACHE HIT TEST\n");
#endif

    //使用加锁还是CAS
#if USE_MUTEX
    printf("Mutex Test\n");
    //创建线程并执行
    printf("Starting %d threads...\n", procs);
    for (int i = 0; i < procs; i++)
    {
        if (pthread_create( &thrs[i], NULL, thread_routine_mutex,
            (void *)(long)i ))
        {
            perror( "pthread_create" );
            procs = i;
            break;
        }
    }
#else
    printf("CAS Test\n");
    //创建线程并执行
    printf("Starting %d threads...\n", procs);
    for (int i = 0; i < procs; i++)
    {
        if (pthread_create(&thrs[i], NULL, thread_routine_cas,
                           (void *)(long)i))
        {
            perror("pthread_create");
            procs = i;
            break;
        }
    }
#endif

    //等待所有线程执行完毕后释放内存
    for (int i = 0; i < procs; i++)
    {
        pthread_join(thrs[i], NULL);
    }
    free( thrs );

    //打印输出结果
    printf("实际累加结果为: %d\n", global_int );
    printf("期望累加结果为: %d\n", procs * INC_TO);

    return 0;
}

测试结果如下: