上一篇笔记中对x86平台上原子变量、关中断、自旋锁和信号量的原理做了复习,本笔记回顾一下Linux使用的几种常用的同步机制。
Linux上的原子变量
Linux上提供了一个atomic_t类型表示原子变量。32位和64位版本的结构体定义如下:
typedef struct {int counter;
} atomic_t;//常用的32位的原子变量类型
#ifdef CONFIG_64BIT
typedef struct {s64 counter;
} atomic64_t;//64位的原子变量类型
#endif
在Linux中,操作原子变量要通过专门的接口来实现(各个平台的汇编会有所不同,本笔记都是按照x86版本来看),以下列举几个最基础的:
/*** atomic_read - read atomic variable* @v: pointer of type atomic_t** Atomically reads the value of @v.*/
static __always_inline int atomic_read(const atomic_t *v)
{return READ_ONCE((v)->counter);
}/*** atomic_set - set atomic variable* @v: pointer of type atomic_t* @i: required value** Atomically sets the value of @v to @i.*/
static __always_inline void atomic_set(atomic_t *v, int i)
{WRITE_ONCE(v->counter, i);
}/*** atomic_add - add integer to atomic variable* @i: integer value to add* @v: pointer of type atomic_t** Atomically adds @i to @v.*/
static __always_inline void atomic_add(int i, atomic_t *v)
{asm volatile(LOCK_PREFIX "addl %1,%0": "+m" (v->counter): "ir" (i));
}/*** atomic_sub - subtract integer from atomic variable* @i: integer value to subtract* @v: pointer of type atomic_t** Atomically subtracts @i from @v.*/
static __always_inline void atomic_sub(int i, atomic_t *v)
{asm volatile(LOCK_PREFIX "subl %1,%0": "+m" (v->counter): "ir" (i));
}/*** atomic_inc - increment atomic variable* @v: pointer of type atomic_t** Atomically increments @v by 1.*/
static __always_inline void atomic_inc(atomic_t *v)
{asm volatile(LOCK_PREFIX "incl %0": "+m" (v->counter));
}/*** atomic_dec - decrement atomic variable* @v: pointer of type atomic_t** Atomically decrements @v by 1.*/
static __always_inline void atomic_dec(atomic_t *v)
{asm volatile(LOCK_PREFIX "decl %0": "+m" (v->counter));
}
可以看到,核心的东西和上一节介绍的没有区别。LOCK_PREFIX在SMP系统中就是lock指令,单核系统中则为空串。
Linux下中断控制
Linux下中断控制的基础接口如下:
//实际保存eflags寄存器
extern __always_inline unsigned long native_save_fl(void){unsigned long flags;asm volatile("# __raw_save_flags\n\t""pushf ; pop %0":"=rm"(flags)::"memory");return flags;
}
//实际恢复eflags寄存器
extern inline void native_restore_fl(unsigned long flags){asm volatile("push %0 ; popf"::"g"(flags):"memory","cc");
}
//实际关中断
static __always_inline void native_irq_disable(void){asm volatile("cli":::"memory");
}
//实际开启中断
static __always_inline void native_irq_enable(void){asm volatile("sti":::"memory");
}
//arch层关中断
static __always_inline void arch_local_irq_disable(void){native_irq_disable();
}
//arch层开启中断
static __always_inline void arch_local_irq_enable(void){ native_irq_enable();
}
//arch层保存eflags寄存器
static __always_inline unsigned long arch_local_save_flags(void){return native_save_fl();
}
//arch层恢复eflags寄存器
static __always_inline void arch_local_irq_restore(unsigned long flags){native_restore_fl(flags);
}
//实际保存eflags寄存器并关中断
static __always_inline unsigned long arch_local_irq_save(void){unsigned long flags = arch_local_save_flags();arch_local_irq_disable();return flags;
}
//raw层关闭开启中断宏
#define raw_local_irq_disable() arch_local_irq_disable()
#define raw_local_irq_enable() arch_local_irq_enable()
//raw层保存恢复eflags寄存器宏
#define raw_local_irq_save(flags) \do { \typecheck(unsigned long, flags); \flags = arch_local_irq_save(); \} while (0)#define raw_local_irq_restore(flags) \do { \typecheck(unsigned long, flags); \arch_local_irq_restore(flags); \} while (0)#define raw_local_save_flags(flags) \do { \typecheck(unsigned long, flags); \flags = arch_local_save_flags(); \} while (0)
//通用层接口宏
#define local_irq_enable() \do { \raw_local_irq_enable(); \} while (0)#define local_irq_disable() \do { \raw_local_irq_disable(); \} while (0)#define local_irq_save(flags) \do { \raw_local_irq_save(flags); \} while (0)#define local_irq_restore(flags) \do { \raw_local_irq_restore(flags); \} while (0)
Linux自旋锁
typedef struct raw_spinlock {arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAKunsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCKunsigned int magic, owner_cpu;void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;
#endif
} raw_spinlock_t;typedef struct spinlock {union {struct raw_spinlock rlock;#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))struct {u8 __padding[LOCK_PADSIZE];struct lockdep_map dep_map;};
#endif};
} spinlock_t;
忽略掉调试相关的代码,结构体最重要的是arch_spinlock_t,可以看出各个体系架构下会有所不同。
加锁操作
#define raw_spin_lock(lock) _raw_spin_lock(lock)static inline void spin_lock(spinlock_t *lock)
{raw_spin_lock(&lock->rlock);
}
spinlock分为SMP版本和UP版本(include/linux/spinlock_api_smp.h ,include/linux/spinlock_api_up.h),以SMP版本为例来分析。SMP版本中,_raw_spin_lock为声明为:
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) __acquires(lock);static inline void __raw_spin_lock(raw_spinlock_t *lock)
{// 禁止抢占preempt_disable();// for debugspin_acquire(&lock->dep_map, 0, 0, _RET_IP_);// real work done hereLOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
do_raw_spin_trylock和do_raw_spin_lock相关代码是:
void do_raw_spin_lock(raw_spinlock_t *lock)
{debug_spin_lock_before(lock);arch_spin_lock(&lock->raw_lock);debug_spin_lock_after(lock);
}int do_raw_spin_trylock(raw_spinlock_t *lock)
{int ret = arch_spin_trylock(&lock->raw_lock);if (ret)debug_spin_lock_after(lock);
#ifndef CONFIG_SMP/** Must not happen on UP:*/SPIN_BUG_ON(!ret, lock, "trylock failure on UP");
#endifreturn ret;
}
可以看到,主要是调用arch_spin_trylock和arch_spin_lock,从名字看就知道它们依赖于具体的体系结构,x86平台对应的是(每个体系架构下应该会有一个spinlock.h文件表明它所使用的spinlock函数,可以看这个文件来追踪、、)queued_spin_trylock和queued_spin_lock(kernel 4.14)。
static __always_inline int queued_spin_trylock(struct qspinlock *lock)
{if (!atomic_read(&lock->val) &&(atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL) == 0))return 1;return 0;
}static __always_inline void queued_spin_lock(struct qspinlock *lock)
{u32 val;val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);if (likely(val == 0))return;queued_spin_lock_slowpath(lock, val);
}
注:include/asm-generic/qspinlock.h里将arch_spin_trylock和arch_spin_lock定义成了queued_spin_trylock和queued_spin_lock。
其中atomic_cmpxchg_accquire对应atomic_cmpxchg,最终调用__raw_cmpxchg,核心就是使用了x86的cmpxchgb、cmpxchgw、cmpxchgl、cmpxchgq指令。
想要深入研究指令的可以百度,这里我们主要关注atomic_cmpxchg本身的功能,这个函数的原型是:
static __always_inline int atomic_cmpxchg(atomic_t *v, int old, int new);
它的功能是原子实现比较和交换过程,对比old和v的值,如果相等,则将new存储到v中,返回旧值;如果不等,返回v的值。
因此queued_spin_trylock的主要工作就是对比一下lock当前值是否为0(未加锁),如果是则加锁后返回;如果没有成功则直接返回。
queued_spin_lock则首先尝试用trylock方式加锁,如果失败则进入slowpath方式加锁。这个slowpath里会循环等待锁释放后再次进行加锁操作(kernel/locking/qspinlock.c)。
如果还想要深入了解queued spinlock细节,可以参考这篇文章:
Linux内核同步机制之(九):Queued spinlock前言 本站之前已经有了一篇关于http://www.wowotech.net/kernel_synchronization/queued_spinlock.html linux下spinlock的发展也是由简单变复杂的,想要进一步了解的,可以参考这篇文章
PV qspinlock原理_小写的毛毛的博客-CSDN博客_spinlock实现原理1 前言自旋锁(spinlock)是用来在多处理器环境中工作的一种锁。如果内核控制路径发现spinlock是unlock,就获取锁并继续执行;相反,如果内核控制路径发现锁由运行在另一个CPU上的内核控制路径lock,就在周围“旋转”,反复执行一条紧凑的循环指令,直到锁被释放。spinlock的循环指令表示“忙等”:即使等待的内核控制路径无事可做(除了浪费时间),它也在CPU上保持运行。spinlock的实现依赖这样一个假设:锁的持有线程和等待线程都不能被抢占。但是在虚拟化场景下,vCPU可能在任意时刻https://blog.csdn.net/bemind1/article/details/118224344
Linux信号量
Linux下信号量的数据结构定义如下:
struct semaphore{raw_spinlock_t lock;//保护信号量自身的自旋锁unsigned int count;//信号量值struct list_head wait_list;//挂载睡眠等待进程的链表
};
对信号量操作的核心接口是down和up,主要代码如下:
static inline int __sched __down_common(struct semaphore *sem, long state,long timeout)
{struct semaphore_waiter waiter;//把waiter加入sem->wait_list的头部list_add_tail(&waiter.list, &sem->wait_list);waiter.task = current;//current表示当前进程,即调用该函数的进程waiter.up = false;for (;;) {if (signal_pending_state(state, current))goto interrupted;if (unlikely(timeout <= 0))goto timed_out;__set_current_state(state);//设置当前进程的状态,进程睡眠,即先前__down函数中传入的TASK_UNINTERRUPTIBLE:该状态是等待资源有效时唤醒(比如等待键盘输入、socket连接、信号(signal)等等),但不可以被中断唤醒raw_spin_unlock_irq(&sem->lock);//释放在down函数中加的锁timeout = schedule_timeout(timeout);//真正进入睡眠raw_spin_lock_irq(&sem->lock);//进程下次运行会回到这里,所以要加锁if (waiter.up)return 0;}timed_out:list_del(&waiter.list);return -ETIME;interrupted:list_del(&waiter.list);return -EINTR;//为了简单起见处理进程信号(signal)和超时的逻辑代码我已经删除
}
//进入睡眠等待
static noinline void __sched __down(struct semaphore *sem)
{__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
//获取信号量
void down(struct semaphore *sem)
{unsigned long flags;//对信号量本身加锁并关中断,也许另一段代码也在操作该信号量raw_spin_lock_irqsave(&sem->lock, flags);if (likely(sem->count > 0))sem->count--;//如果信号量值大于0,则对其减1else__down(sem);//否则让当前进程进入睡眠raw_spin_unlock_irqrestore(&sem->lock, flags);
}
//实际唤醒进程
static noinline void __sched __up(struct semaphore *sem)
{struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list);//获取信号量等待链表中的第一个数据结构semaphore_waiter,它里面保存着睡眠进程的指针list_del(&waiter->list);waiter->up = true;wake_up_process(waiter->task);//唤醒进程重新加入调度队列
}
//释放信号量
void up(struct semaphore *sem)
{unsigned long flags;//对信号量本身加锁并关中断,必须另一段代码也在操作该信号量raw_spin_lock_irqsave(&sem->lock, flags);if (likely(list_empty(&sem->wait_list)))sem->count++;//如果信号量等待链表中为空,则对信号量值加1else__up(sem);//否则执行唤醒进程相关的操作raw_spin_unlock_irqrestore(&sem->lock, flags);
}
信号量的代码本身流程逻辑不算复杂,需要注意的是schedule_timeout的下一条语句是进程被唤醒后回来执行的地方。
Linux读写锁
在实际场景中,可能会碰到这种情况:有一个复杂的结构体变量作为全局的管理信息变量,这个变量访问的特点是初始化之后,很少会去进行改动,绝大部分情况下都是多个线程进行读取这个结构中不同的成员。这种情况下,如果我们还是使用普通的自旋锁或信号量,效率是非常低的。实际上如果有多个线程都只是读这个结构,根本没有必要去加锁,只有要修改的时候才需要确保修改过程不会被打断。
对于此类读数据频率远大于写数据频率的场景,Linux提供了读写锁。读写锁也叫做共享-独占锁(shared-exclusive)。当读者进行加锁时,是以共享模式上锁;当写者进行加锁时,是以独占模式上锁。
读写是互斥的,读的时候不能写,写的时候不能读,但允许同时有多个读者读。
1. 当没有加锁时,读取的加锁操作和写入的加锁操作都可以满足
2. 当持有读锁时,所有读者请求的加锁操作都能满足,写者的加锁请求不能满足
3. 当持有写锁时,所有的读者的加锁操作都不能满足,所有的写者的加锁操作也不能满足,读与写之间是互斥的,写与写之间也是互斥的。
Linux下读写锁可以看做是自旋锁的变种。读写锁内部实现上核心代码如下:
//读写锁初始化锁值
#define RW_LOCK_BIAS 0x01000000
//读写锁的底层数据结构
typedef struct{unsigned int lock;
}arch_rwlock_t;
//释放读锁
static inline void arch_read_unlock(arch_rwlock_t*rw){ asm volatile(LOCK_PREFIX"incl %0" //原子对lock加1:"+m"(rw->lock)::"memory");
}
//释放写锁
static inline void arch_write_unlock(arch_rwlock_t*rw){asm volatile(LOCK_PREFIX"addl %1, %0"//原子对lock加上RW_LOCK_BIAS:"+m"(rw->lock):"i"(RW_LOCK_BIAS):"memory");
}
//获取写锁失败时调用
ENTRY(__write_lock_failed)//(%eax)表示由eax指向的内存空间是调用者传进来的 2:LOCK_PREFIX addl $ RW_LOCK_BIAS,(%eax)1:rep;nop//空指令cmpl $RW_LOCK_BIAS,(%eax)//不等于初始值则循环比较,相等则表示有进程释放了写锁jne 1b//执行加写锁LOCK_PREFIX subl $ RW_LOCK_BIAS,(%eax)jnz 2b //不为0则继续测试,为0则表示加写锁成功ret //返回
ENDPROC(__write_lock_failed)
//获取读锁失败时调用
ENTRY(__read_lock_failed)//(%eax)表示由eax指向的内存空间是调用者传进来的 2:LOCK_PREFIX incl(%eax)//原子加11: rep; nop//空指令cmpl $1,(%eax) //和1比较 小于0则js 1b //为负则继续循环比较LOCK_PREFIX decl(%eax) //加读锁js 2b //为负则继续加1并比较,否则返回ret //返回
ENDPROC(__read_lock_failed)
//获取读锁
static inline void arch_read_lock(arch_rwlock_t*rw){asm volatile(LOCK_PREFIX" subl $1,(%0)\n\t"//原子对lock减1"jns 1f\n"//不为小于0则跳转标号1处,表示获取读锁成功"call __read_lock_failed\n\t"//调用__read_lock_failed"1:\n"::LOCK_PTR_REG(rw):"memory");
}
//获取写锁
static inline void arch_write_lock(arch_rwlock_t*rw){asm volatile(LOCK_PREFIX"subl %1,(%0)\n\t"//原子对lock减去RW_LOCK_BIAS"jz 1f\n"//为0则跳转标号1处"call __write_lock_failed\n\t"//调用__write_lock_failed"1:\n"::LOCK_PTR_REG(rw),"i"(RW_LOCK_BIAS):"memory");
}
总结一下这段代码的要点:
1. 计数器的初值为RW_LOCK_BIAS
2. 读者加锁计数器减1
3. 写者加锁计数器减去RW_LOCK_BIAS
注意,实际使用时要使用Linux标准接口如read_lock,write_lock等,而不是arch开头的接口。