对称多处理

竟态条件

竟态条件:资源计数器释放

void release_resource()
{
     counter--;

     if (!counter)
         free_resource();
}

竟态条件场景

 

../_images/ditaa-35f7597b35b83bb0025ac2a5f158c9eae23050c8.png

避免竟态条件

Linux 内核并发源

原子操作

使用 atomic_dec_and_test() 来实现资源计数器的释放

void release_resource()
{
    if (atomic_dec_and_test(&counter))
         free_resource();
}

在 SMP 系统上原子操作可能不再是原子的

 

../_images/ditaa-ddd14be50300088958e86912bc5f396797634a3a.png

修复 SMP 系统上的原子操作(x86)

 

../_images/ditaa-c11fccb956cdf115910f9f72e1dc14cd7ed549ff.png

与中断同步(x86)

 #define local_irq_disable() \
     asm volatile („cli” : : : „memory”)

#define local_irq_enable() \
    asm volatile („sti” : : : „memory”)

#define local_irq_save(flags) \
    asm volatile ("pushf ; pop %0" :"=g" (flags)
                  : /* no input */: "memory") \
    asm volatile("cli": : :"memory")

#define local_irq_restore(flags) \
    asm volatile ("push %0 ; popf"
                  : /* no output */
                  : "g" (flags) :"memory", "cc");

自旋锁实现示例(x86)

  spin_lock:
      lock bts [my_lock], 0
      jc spin_lock

  /* 临界区 */

  spin_unlock:
      mov [my_lock], 0

**bts dts, src**——位测试并设置;它将来自 dts 内存地址的第 src 位复制到进位标志位(carry flag),然后将其设置为 1:
CF <- dts[src]
dts[src] <- 1

锁争用

高速缓存抖动

当多个处理器核心试图读写同一内存时,会发生高速缓存抖动,导致过多的高速缓存未命中。

由于自旋锁在锁争用期间不断访问内存,高速缓存抖动很常见,这是由高速缓存一致性的实现方式造成的。

同步的缓存和内存

 

../_images/ditaa-4d63c157487ff8291f2a6e93fe680ec38c1a3212.png

缺乏同步的缓存和内存

 

../_images/ditaa-7ee0f9bb5f5af586e043afd47cfbad0adcc34888.png

缓存一致性协议

缓存嗅探协议较为简单,但当核心数超过 32-64 时性能表现较差。

目录协议的缓存一致性协议能够更好地扩展(可达数千个核心),非一致性存储访问(NUMA)系统中通常使用的就是目录协议。

MESI 缓存一致性协议

MESI 状态转换

自旋锁争用导致的缓存抖动

 

../_images/ditaa-b26d802c286bda6c559b4dcfa8a7fb27f840463e.png

优化的自旋锁(KeAcquireSpinLock)

 

spin_lock:
    rep ; nop
    test lock_addr, 1
    jnz spin_lock
    lock bts lock_addr
    jc spin_lock

自旋锁队列

 

../_images/ditaa-58545831034f050660727be99cede213bc4a53c7.png

进程和中断处理程序同步死锁

用于 SMP 的中断同步

SMP 的软中断同步

抢占

 

抢占是可配置的:如果激活,它提供更低的延迟和响应时间,而如果停用,它提供更好的吞吐量。

抢占被自旋锁和互斥锁禁用,但也可以手动禁用(通过核心内核代码)。

抢占和软中断屏蔽

#define PREEMPT_BITS      8
#define SOFTIRQ_BITS      8
#define HARDIRQ_BITS      4
#define NMI_BITS          1

#define preempt_disable() preempt_count_inc()

#define local_bh_disable() add_preempt_count(SOFTIRQ_OFFSET)

#define local_bh_enable() sub_preempt_count(SOFTIRQ_OFFSET)

#define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK))

#define in_interrupt() irq_count()

asmlinkage void do_softirq(void)
{
    if (in_interrupt()) return;
    ...

互斥锁

mutex_lock() 捷径

void __sched mutex_lock(struct mutex *lock)
{
  might_sleep();

  if (!__mutex_trylock_fast(lock))
    __mutex_lock_slowpath(lock);
}

static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
  unsigned long curr = (unsigned long)current;

  if (!atomic_long_cmpxchg_acquire(&lock->owner, 0UL, curr))
    return true;

  return false;
}

mutex_lock() 慢路径

...
  spin_lock(&lock->wait_lock);
...
  /* 添加等待的任务到等待队列尾部 (FIFO): */
  list_add_tail(&waiter.list, &lock->wait_list);
...
  waiter.task = current;
...
  for (;;) {
    if (__mutex_trylock(lock))
      goto acquired;
  ...
    spin_unlock(&lock->wait_lock);
  ...
    set_current_state(state);
    spin_lock(&lock->wait_lock);
  }
  spin_lock(&lock->wait_lock);
acquired:
  __set_current_state(TASK_RUNNING);
  mutex_remove_waiter(lock, &waiter, current);
  spin_lock(&lock->wait_lock);
...

mutex_unlock() 快速路径

void __sched mutex_unlock(struct mutex *lock)
{
  if (__mutex_unlock_fast(lock))
    return;
  __mutex_unlock_slowpath(lock, _RET_IP_);
}

static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
  unsigned long curr = (unsigned long)current;

  if (atomic_long_cmpxchg_release(&lock->owner, curr, 0UL) == curr)
    return true;

  return false;
}

void __mutex_lock_slowpath(struct mutex *lock)
{
...
  if (__mutex_waiter_is_first(lock, &waiter))
          __mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
...

mutex_unlock() 慢速路径

...
spin_lock(&lock->wait_lock);
if (!list_empty(&lock->wait_list)) {
  /* 获得等待队列的第一个条目 */
  struct mutex_waiter *waiter;
  waiter = list_first_entry(&lock->wait_list, struct mutex_waiter,
                            list);
  next = waiter->task;
  wake_q_add(&wake_q, next);
}
...
spin_unlock(&lock->wait_lock);
...
wake_up_q(&wake_q);

每个 CPU 独占的数据

乱序编译器生成的代码

C code Compiler generated code
a = 1;
b = 2;
MOV R10, 1
MOV R11, 2
STORE R11, b
STORE R10, a

屏障

读-复制-更新(RCU)

移除和回收

RCU 列表删除

 

../_images/ditaa-5193a924360bebc83d2f81188cd0b0093ec01e6a.png

列表 RCU API 速查表

/* 列表遍历 */
rcu_read_lock();
list_for_each_entry_rcu(i, head) {
  /* 不允许休眠、阻塞调用或上下文切换 */
}
rcu_read_unlock();


/* 列表元素删除  */
spin_lock(&lock);
list_del_rcu(&node->list);
spin_unlock(&lock);
synchronize_rcu();
kfree(node);

/* 列表元素添加  */
spin_lock(&lock);
list_add_rcu(head, &node->list);
spin_unlock(&lock);