• DPDK系列之三十四DPDK并行机制的同步控制


    一、同步

    Synchronization,同步。很好理解,简单的可以认为是同一个节奏。引申到计算机,其实就是多任务保持一致性。这个一致性一般是指数据,也可能会是操作。在单任务单线(进)程中,同步是天然的。在多任务无共同操作条件下,同步没有意义。
    同步一般是在多线(进)程(多任务)且这些线程之间必须有共同操作部分时,才会有意义。
    以前说过,计算机中的应用和现实的应用是映射。什么意思呢?如果计算机解决的问题,在现实世界上都可以找到原型。比如同步,举一个例子:如果测试学生做操的成绩,各自单独测试即可。可要测试班级操的成绩,这个即使每个学生做的都非常好,但各自做各自的,那么成绩也不一定好。只有同一班的学生保持一致(同步),这样才成绩更优秀。

    二、并行和并发中的同步

    刚刚说过,在单线程和多线程的某些情况下,同步是没有意义的。所以,同步一定是在并发和并行中且有交互的情况下,才有意义。而并发和并行,又增加了同步的复杂性。但是当把同步抽象出来,其实就是具体到某一个粒度是串行的。也就是说,如何保证这个粒度是最合适的。也就引出了后面,使用哪种锁是最合适的话题。
    在开发者们学习多线程编程时,最先接触的同步方式一般是互斥体(Mutex)。它是一种重量级锁,既支持进程间的同步又支持线程间的同步方式。然后才是其它的如信号量、条件变量、临界区等等。使用互斥锁缺点主要是太耗费时间,而且一不小心还有可能产生死锁。
    那么,就产生了一种读写锁,读写锁其实是一种对锁机制的平衡,在正常的情况下,如果只是读取的话,其实数据自然是同步的,而且事实是在某些场合下,读是一种非常频繁的操作(比如缓存),所以这就可以不进行排它的读,减少锁的时间耗费。当然,如果判断一个线程要写数据时,需要启动排它性,只能它一个操作了。在Linux中设计了一个读写锁ReadWriteLock 。但是,一定要知道在什么情况下使用读写锁。比如,一个场景就是十个线程抢占式写一个数据区,那么读写锁反而没啥意义了。读写锁其实是两部分,一部分是读锁,需要支持多个持有者,一个是写锁,排它。在内核中使用提把一个int型分成高低两位,一部分用来处理读(两个字节,最多65535个读锁持有者),一部分用来处理写。读部分因为是共同持有,所以需要使用CAS进行处理。
    而在c++11中推出了automic,原子操作。当然,别的语言和别的情况下也各自有各自的原子操作的方式。所谓原子操作,就是不可再分解的意思。也就是说,这是串行化的,操作它,只能一个个轮流来,不能说你读一下我写一下乱序执行。原子操作其实就基本涉及到了硬件操作了,通过汇编的指令接口,来影响具体的执行过程。一般为说,原子操作和内存顺序也是有一定的关系的,需要大家注意。同时,不同的架构的CPU和不同的架构体系(SMP NUMA MPP)也有不同的处理方式,都需要根据实际情况来分析。原子操作只能操作一个变量,当然一个位操作也算一个变量,也就是原子位操作。
    还有一种比较常见的,也是目前在无锁编程中用得比较多的,自旋锁。自旋锁,就是自个儿旋转的锁?表面上看就是这么一个玩意儿。它底层其实是通过CAS不断的尝试去读取锁(表现出来就是一个循环,但其实底层如果支持PAUSE指令,则可以使用其),获取了就操作不获取就继续。
    CAS有几个经典的问题,ABA问题,时间耗费问题和只能操作一个变量(这也是前面把一个int分成高低两部分的原因)。但它也有闪光点啊。一般来说,在多线程编程中,当线程达到一定数量后,线程上下文的切换就成为了一个需要重视的点。互斥锁等会引起线程的切换及线程的睡眠,而CAS只是阻塞CPU而不会引起线程的退让,这在某些场景下,优势还是非常明显的。
    那么就可以明白,在只有一颗CPU一个核心的情况下,自旋锁没有意义,已经退化为普通的互斥体类似的机制,只有在多核多CPU的情况下才有意义。同时,自旋锁持有CPU不放,所以只能短时间使用,如果长时间需要锁住一个数据,就不能使用自旋锁。所以早期的自旋锁,基本都只是在内核中使用,后期才渐渐在应用层开始使用。同时为了安全,应用层使用自旋锁往往会提供一些安全机制,比如回退到某个时间点上会退化为普通锁。自旋锁在使用过程中是不允许递归的,同样在自旋锁的源码中可以看到其屏蔽了中断请求的函数(中断中如果也有自旋锁,这事儿就难办了)。
    当然,还有一些其它的同步操作方式,如内存栅栏等,它们之间或多或少都有一些相通之处。这里不再赘述。
    最后,所有的有锁编程,当然不如无锁编程更好。无锁并不是真正意义上的无锁,而把锁进一步下移到非上层控制,在上层表现为无传统意义上锁机制。可以简单的理解成使用指令级操作通过硬件来进行锁操作,速度更快。
    那么,开发中到底使用哪种锁最好呢?
    一般来说,没有最好,只有最合适。前面也简单分析了一些锁的应用场景,其实就是提醒开发者,在实际应用中,要根据实际情况动态选择使用哪种锁甚至混合使用这些锁。哪种方式能更好的实现目的,就用哪种,这才是正确的方法。

    三、DPDK中的同步及源码分析

    DPDK中同样也引入原子操作、读写锁和自旋锁三种方式。
    1、原子操作

    //rte_atomic.h
    #define	rte_mb() _mm_mfence()
    
    #define	rte_wmb() _mm_sfence()
    
    #define	rte_rmb() _mm_lfence()
    
    #define rte_smp_wmb() rte_compiler_barrier()
    
    #define rte_smp_rmb() rte_compiler_barrier()
    /**
     * Compiler barrier.
     *
     * Guarantees that operation reordering does not occur at compile time
     * for operations directly before and after the barrier.
     */
    #define	rte_compiler_barrier() do {		\
    	asm volatile ("" : : : "memory");	\
    } while(0)
    
    static inline void
    rte_atomic64_add(rte_atomic64_t *v, int64_t inc);
    
    #ifdef RTE_FORCE_INTRINSICS
    static inline void
    rte_atomic64_add(rte_atomic64_t *v, int64_t inc)
    {
    	__sync_fetch_and_add(&v->cnt, inc);
    }
    #endif
    
    /**
     * Atomically subtract a 64-bit value from a counter.
     *
     * @param v
     *   A pointer to the atomic counter.
     * @param dec
     *   The value to be subtracted from the counter.
     */
    static inline void
    rte_atomic64_sub(rte_atomic64_t *v, int64_t dec);
    
    #ifdef RTE_FORCE_INTRINSICS
    static inline void
    rte_atomic64_sub(rte_atomic64_t *v, int64_t dec)
    {
    	__sync_fetch_and_sub(&v->cnt, dec);
    }
    #endif
    
    /**
     * Atomically increment a 64-bit counter by one and test.
     *
     * @param v
     *   A pointer to the atomic counter.
     */
    static inline void
    rte_atomic64_inc(rte_atomic64_t *v);
    
    #ifdef RTE_FORCE_INTRINSICS
    static inline void
    rte_atomic64_inc(rte_atomic64_t *v)
    {
    	rte_atomic64_add(v, 1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    2、读写锁

    //rte_rwlock.h
    /**
     * The rte_rwlock_t type.
     *
     * cnt is -1 when write lock is held, and > 0 when read locks are held.
     */
    typedef struct {
    	volatile int32_t cnt; /**< -1 when W lock held, > 0 when R locks held. */
    } rte_rwlock_t;
    
    /**
     * A static rwlock initializer.
     */
    #define RTE_RWLOCK_INITIALIZER { 0 }
    
    /**
     * Initialize the rwlock to an unlocked state.
     *
     * @param rwl
     *   A pointer to the rwlock structure.
     */
    static inline void
    rte_rwlock_init(rte_rwlock_t *rwl)
    {
    	rwl->cnt = 0;
    }
    
    /**
     * Take a read lock. Loop until the lock is held.
     *
     * @param rwl
     *   A pointer to a rwlock structure.
     */
    static inline void
    rte_rwlock_read_lock(rte_rwlock_t *rwl)
    {
    	int32_t x;
    	int success = 0;
    
    	while (success == 0) {
    		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
    		/* write lock is held */
    		if (x < 0) {
    			rte_pause();
    			continue;
    		}
    		success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
    					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    	}
    }
    
    /**
     * @warning
     * @b EXPERIMENTAL: this API may change without prior notice.
     *
     * try to take a read lock.
     *
     * @param rwl
     *   A pointer to a rwlock structure.
     * @return
     *   - zero if the lock is successfully taken
     *   - -EBUSY if lock could not be acquired for reading because a
     *     writer holds the lock
     */
    __rte_experimental
    static inline int
    rte_rwlock_read_trylock(rte_rwlock_t *rwl)
    {
    	int32_t x;
    	int success = 0;
    
    	while (success == 0) {
    		x = __atomic_load_n(&rwl->cnt, __ATOMIC_RELAXED);
    		/* write lock is held */
    		if (x < 0)
    			return -EBUSY;
    		success = __atomic_compare_exchange_n(&rwl->cnt, &x, x + 1, 1,
    					__ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    	}
    
    	return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    3、自旋锁

    //ret_spinlock.h
    static inline void
    rte_spinlock_lock(rte_spinlock_t *sl)
    {
    	int lock_val = 1;
    	asm volatile (
    			"1:\n"
    			"xchg %[locked], %[lv]\n"
    			"test %[lv], %[lv]\n"
    			"jz 3f\n"
    			"2:\n"
    			"pause\n"
    			"cmpl $0, %[locked]\n"
    			"jnz 2b\n"
    			"jmp 1b\n"
    			"3:\n"
    			: [locked] "=m" (sl->locked), [lv] "=q" (lock_val)
    			: "[lv]" (lock_val)
    			: "memory");
    }
    
    static inline void
    rte_spinlock_unlock (rte_spinlock_t *sl)
    {
    	int unlock_val = 0;
    	asm volatile (
    			"xchg %[locked], %[ulv]\n"
    			: [locked] "=m" (sl->locked), [ulv] "=q" (unlock_val)
    			: "[ulv]" (unlock_val)
    			: "memory");
    }
    
    static inline int
    rte_spinlock_trylock (rte_spinlock_t *sl)
    {
    	int lockval = 1;
    
    	asm volatile (
    			"xchg %[locked], %[lockval]"
    			: [locked] "=m" (sl->locked), [lockval] "=q" (lockval)
    			: "[lockval]" (lockval)
    			: "memory");
    
    	return lockval == 0;
    }
    #endif
    
    extern uint8_t rte_rtm_supported;
    
    static inline int rte_tm_supported(void)
    {
    	return rte_rtm_supported;
    }
    
    static inline int
    rte_try_tm(volatile int *lock)
    {
    	int i, retries;
    
    	if (!rte_rtm_supported)
    		return 0;
    
    	retries = RTE_RTM_MAX_RETRIES;
    
    	while (likely(retries--)) {
    
    		unsigned int status = rte_xbegin();
    
    		if (likely(RTE_XBEGIN_STARTED == status)) {
    			if (unlikely(*lock))
    				rte_xabort(RTE_XABORT_LOCK_BUSY);
    			else
    				return 1;
    		}
    		while (*lock)
    			rte_pause();
    
    		if ((status & RTE_XABORT_CONFLICT) ||
    		   ((status & RTE_XABORT_EXPLICIT) &&
    		    (RTE_XABORT_CODE(status) == RTE_XABORT_LOCK_BUSY))) {
    			/* add a small delay before retrying, basing the
    			 * delay on the number of times we've already tried,
    			 * to give a back-off type of behaviour. We
    			 * randomize trycount by taking bits from the tsc count
    			 */
    			int try_count = RTE_RTM_MAX_RETRIES - retries;
    			int pause_count = (rte_rdtsc() & 0x7) | 1;
    			pause_count <<= try_count;
    			for (i = 0; i < pause_count; i++)
    				rte_pause();
    			continue;
    		}
    
    		if ((status & RTE_XABORT_RETRY) == 0) /* do not retry */
    			break;
    	}
    	return 0;
    }
    
    static inline void
    rte_spinlock_lock_tm(rte_spinlock_t *sl)
    {
    	if (likely(rte_try_tm(&sl->locked)))
    		return;
    
    	rte_spinlock_lock(sl); /* fall-back */
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107

    4、DPDK无锁队列
    这里多说一句,DPDK的无锁队列是通过CAS来实现的,目前无锁队列也是应用CAS最广泛的一种应用。这个队列在前面分析过,这里就不再重贴代码了。

    上面的代码都只是一部分的实现,有兴趣可以去DPDK的相关目录下找到这些文件,认真分析一下。至于如何调用的,可以直接查看引用,在代码中看这些API是如何调用的,基本就可以知道这几类锁在DPDK中的应用情况。

    四、总结

    锁的机制在不同的层次上有着不同的表述和表现形式,在不同的语言上可能又会衍生出不同的形式。这就需要开发者去仔细的分析其真正使用的哪种锁,从而能更好的对其进行应用。可以试着看一下Mutex在内核中是如何实现的(futex机制),其实就明白了上面的话的意思。
    底层建设一直是重要的环节,侯捷老师说过:勿在浮沙筑高台。

  • 相关阅读:
    C++ Reference: Standard C++ Library reference: C Library: cstring: memchr
    基于SSM+Vue的体育馆管理系统的设计与实现
    Vue 3 中的 Composition API 详解
    数据结构与算法之Hash&BitMap
    vue3学习(七)--- Teleport传送组件
    Ubuntu安装git方法
    第160场直播带货数据分享
    基于vue3 + ant-design 自定义SVG图标iconfont的解决方案;ant-design加载本地iconfont.js不显示图标问题
    《计算机视觉40例》内容简介
    Spring Data JPA 中的分页和排序
  • 原文地址:https://blog.csdn.net/fpcc/article/details/133840234