内核并发与同步机制

1. 内核并发概述

Linux 内核是一个高度并发的环境。在多核处理器上,多个线程可以同时执行内核代码;在中断上下文中,硬件随时可能打断当前执行流;抢占机制允许调度器随时切换任务。这些并发源带来了严峻的挑战:竞态条件(Race Condition)死锁(Deadlock)优先级反转(Priority Inversion)

内核提供了丰富的同步原语来应对这些挑战。选择合适的同步机制需要理解其语义、性能特征和适用场景。错误的同步选择可能导致性能瓶颈或难以调试的 bug。

并发来源: ┌──────────────────────────────────────────────────────────────┐ │ Kernel Space │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ CPU 0 │ │ CPU 1 │ │ CPU N │ ← 多处理器并行 │ │ │ Thread A │ │ Thread B │ │ Thread C │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ └─────────────┴─────────────┘ │ │ │ │ │ ┌─────────┴─────────┐ │ │ │ Shared Data │ │ │ └───────────────────┘ │ │ ▲ │ │ ┌─────────────┘ │ │ │ │ │ ┌────┴────┐ │ │ │ 软中断/硬中断 │ ← 异步中断上下文 │ │ └─────────┘ │ │ │ │ ┌─────────────────┐ │ │ │ 内核抢占点 │ ← 调度器随时可能抢占 │ │ │ (schedule() 等) │ │ │ └─────────────────┘ │ └──────────────────────────────────────────────────────────────┘
图 1.1:内核中的并发来源

1.1 内核抢占与上下文

理解内核同步首先要理解执行上下文。Linux 内核代码可能在以下上下文运行:

  • 进程上下文:执行系统调用或内核线程,可以睡眠(调用 schedule)
  • 软中断上下文:执行软中断、tasklet,不能睡眠
  • 硬中断上下文:执行中断处理程序,不能睡眠,时间要求严格

不同的同步原语适用于不同的上下文。例如,互斥锁只能在进程上下文使用,而自旋锁可以在中断上下文使用(需小心)。

2. 原子操作

原子操作是最基础的同步原语,保证操作不可分割。现代处理器提供原子指令(如 x86 的 LOCK 前缀、ARM 的 LDREX/STREX),内核将其封装为统一的 API。

2.1 整数原子操作

/* include/linux/atomic.h */
#include <linux/atomic.h>

atomic_t counter = ATOMIC_INIT(0);

/* 基本操作 */
atomic_set(&counter, 5);           /* 设置值 */
int val = atomic_read(&counter);    /* 读取值 */

/* 算术操作(返回新值或旧值) */
atomic_inc(&counter);               /* 递增 */
atomic_dec(&counter);               /* 递减 */
atomic_add(10, &counter);           /* 加 10 */
atomic_sub(5, &counter);            /* 减 5 */

/* 带返回值的变体 */
int new = atomic_inc_return(&counter);  /* 返回递增后的值 */
int old = atomic_fetch_add(5, &counter); /* 返回加 5 前的值 */

/* 条件操作 */
int ret = atomic_add_unless(&counter, 1, 100);  /* 除非 counter==100,否则加 1 */
bool swapped = atomic_cmpxchg(&counter, 5, 10); /* CAS: 如果==5则设为10 */

/* 64位版本 */
atomic64_t big_counter = ATOMIC64_INIT(0);
atomic64_add(10000000000LL, &big_counter);

2.2 位操作

/* 原子位操作作用于普通内存地址 */
unsigned long flags = 0;

set_bit(0, &flags);          /* 设置第 0 位 */
clear_bit(1, &flags);        /* 清除第 1 位 */
change_bit(2, &flags);       /* 翻转第 2 位 */

/* 测试并操作 */
if (test_and_set_bit(3, &flags))  /* 测试并设置 */
    printk("Bit 3 was already set\n");

/* 查找操作 */
int first = find_first_bit(&flags, 64);   /* 找第一个置位 */
int zero = find_first_zero_bit(&flags, 64); /* 找第一个清零位 */

/* 非原子位操作(带 __ 前缀)用于已受保护的区域 */
__set_bit(4, &flags);  /* 仅在持有锁时使用 */
原子操作与内存序

Linux 原子操作默认提供顺序一致性。对于更精细的控制,内核提供 smp_mb()(内存屏障)、smp_rmb()(读屏障)、smp_wmb()(写屏障)等原语。现代代码推荐使用显式内存序的原子操作。

3. 自旋锁

自旋锁(Spinlock)是最常用的内核锁。当一个 CPU 尝试获取已被持有的锁时,它会"自旋"——忙等待直到锁可用。自旋锁适用于保护临界区很短的情况,因为等待期间占用 CPU。

3.1 基本用法

/* include/linux/spinlock.h */
#include <linux/spinlock.h>

/* 定义并初始化 */
static DEFINE_SPINLOCK(my_lock);

/* 或使用动态初始化 */
spinlock_t my_lock;
spin_lock_init(&my_lock);

/* 基本使用 */
spin_lock(&my_lock);        /* 获取锁 */
/* 临界区:访问共享数据 */
shared_data.counter++;
spin_unlock(&my_lock);      /* 释放锁 */

/* 中断安全的变体 */
unsigned long flags;
spin_lock_irqsave(&my_lock, flags);   /* 保存中断状态并禁用中断 */
/* 临界区 */
spin_unlock_irqrestore(&my_lock, flags); /* 恢复中断状态 */

3.2 自旋锁的实现原理

/* 简化的自旋锁实现( ticket spinlock ) */
typedef struct {
    union {
        u32 slock;
        struct {
            u16 owner;
            u16 next;
        };
    };
} arch_spinlock_t;

/* 加锁 */
static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
    u32 val = 0;
    
    /*
     * xadd: 原子交换并相加
     * 返回旧值,next = 旧值 + 1
     */
    val = atomic_fetch_add(1 << TICKET_SHIFT, &lock->slock);
    
    u16 ticket = val >> TICKET_SHIFT;  /* 我的票号 */
    u16 owner = val & TICKET_MASK;      /* 当前服务号 */
    
    /* 等待叫号 */
    if (ticket != owner) {
        /* 自旋等待 */
        do {
            cpu_relax();  /* 提示 CPU 这是自旋循环 */
            owner = READ_ONCE(lock->owner);
        } while (ticket != owner);
    }
}

/* 解锁 */
static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
    /* 叫下一个号 */
    smp_store_release(&lock->owner, lock->owner + 1);
}
Ticket Spinlock 原理: 初始状态:owner=0, next=0 CPU 0 获取锁: ticket = next(0), next 变为 1 owner(0) == ticket(0) ✓ 获得锁 状态:owner=0, next=1 ← CPU 0 持有 CPU 1 尝试获取: ticket = next(1), next 变为 2 owner(0) != ticket(1) → 自旋等待 CPU 2 尝试获取: ticket = next(2), next 变为 3 owner(0) != ticket(2) → 自旋等待 CPU 0 释放: owner++ → owner=1 CPU 1 看到 owner(1)==ticket(1),获得锁 状态:owner=1, next=3 ← CPU 1 持有
图 3.1:Ticket Spinlock 保证 FIFO 顺序

3.3 读写自旋锁

/* 读写锁允许多个读者或单个写者 */
static DEFINE_RWLOCK(my_rwlock);

/* 读者 */
read_lock(&my_rwlock);
/* 只读访问共享数据 */
val = shared_data.counter;
read_unlock(&my_rwlock);

/* 写者 */
write_lock(&my_rwlock);
/* 独占访问,修改数据 */
shared_data.counter++;
write_unlock(&my_rwlock);

/* 读写锁变体:seqlock */
/* 适合读者极多、写者极少的情况 */
static DEFINE_SEQLOCK(my_seqlock);

/* 读者 */
unsigned seq;
do {
    seq = read_seqbegin(&my_seqlock);
    /* 读取数据 */
    val = shared_data.counter;
} while (read_seqretry(&my_seqlock, seq));

/* 写者 */
write_seqlock(&my_seqlock);
shared_data.counter++;
write_sequnlock(&my_seqlock);

4. 互斥锁与信号量

当临界区可能较长或需要睡眠时,自旋锁不再适用(持有自旋锁时禁止睡眠)。互斥锁(Mutex)信号量(Semaphore)允许获取者在无法立即获得锁时睡眠,将 CPU 让给其他任务。

4.1 互斥锁

/* include/linux/mutex.h */
#include <linux/mutex.h>

static DEFINE_MUTEX(my_mutex);

/* 获取互斥锁(可能睡眠) */
mutex_lock(&my_mutex);
/* 临界区 - 可以安全睡眠 */
schedule_timeout(HZ);  /* 睡眠 1 秒 */
mutex_unlock(&my_mutex);

/* 非阻塞尝试 */
if (mutex_trylock(&my_mutex)) {
    /* 获得锁 */
    mutex_unlock(&my_mutex);
} else {
    printk("Lock is busy\n");
}

/* 带超时的获取 */
if (mutex_lock_interruptible(&my_mutex) == 0) {
    /* 获得锁(或被信号中断)*/
    mutex_unlock(&my_mutex);
}

/* 互斥锁限制:
 * - 同一时间只有一个持有者
 * - 必须由持有者释放
 * - 不能在中断上下文使用
 * - 递归上锁会导致死锁
 */

4.2 信号量

/* include/linux/semaphore.h */
#include <linux/semaphore.h>

/* 二值信号量(类似互斥锁) */
static DEFINE_SEMAPHORE(my_sem);

down(&my_sem);      /* P 操作 - 获取 */
/* 临界区 */
up(&my_sem);        /* V 操作 - 释放 */

/* 计数信号量(允许多个持有者) */
struct semaphore sem;
sema_init(&sem, 5);  /* 最多 5 个并发 */

down(&sem);         /* 计数 -1,为 0 时阻塞 */
/* 执行操作 */
up(&sem);           /* 计数 +1 */

/* 非阻塞变体 */
if (down_trylock(&sem) == 0) {
    /* 获得 */
    up(&sem);
}

/* 可中断变体 */
if (down_interruptible(&sem) == 0) {
    /* 获得 */
    up(&sem);
}

/* 超时变体 */
if (down_timeout(&sem, HZ * 5) == 0) {
    /* 5 秒内获得 */
    up(&sem);
}

4.3 完成量

/* 完成量用于等待某个事件完成 */
#include <linux/completion.h>

static DECLARE_COMPLETION(my_completion);

/* 等待者 */
wait_for_completion(&my_completion);  /* 阻塞等待 */

/* 唤醒者 */
complete(&my_completion);             /* 唤醒一个等待者 */
complete_all(&my_completion);         /* 唤醒所有等待者 */

/* 典型场景:等待异步操作完成 */
void async_worker(struct work_struct *work)
{
    /* 执行耗时操作 */
    do_work();
    
    /* 通知完成 */
    complete(&my_completion);
}

/* 发起者 */
schedule_work(&my_work);
wait_for_completion(&my_completion);  /* 等待工作完成 */

5. RCU 机制

读-复制-更新(Read-Copy-Update)是 Linux 内核最具创新的同步机制。RCU 允许多个读者与写者并发执行,读者完全无锁,写者通过复制和延迟释放实现更新。RCU 适用于读多写少的数据结构。

RCU 更新过程: 1. 读者开始 (rcu_read_lock()) ┌──────────────────────────────────────┐ │ 读者持有对旧数据的引用 │ └──────────────────────────────────────┘ 2. 写者更新 (copy + modify) 旧数据 ──→ 复制 ──→ 修改副本 ──→ 发布新指针 [A,B,C] [A,B,C] [A,B,C,D] [A,B,C,D] (smp_store_release) 3. 新读者看到新数据 ┌──────────────────────────────────────┐ │ 新读者看到 [A,B,C,D] │ │ 旧读者仍看到 [A,B,C] │ └──────────────────────────────────────┘ 4. 等待宽限期 (synchronize_rcu()) - 确保所有旧读者完成 - 系统经历一次上下文切换 5. 安全释放旧数据 (kfree_rcu()) 旧数据 [A,B,C] 被回收
图 5.1:RCU 更新过程示意图
/* include/linux/rcupdate.h */
#include <linux/rcupdate.h>
#include <linux/rculist.h>

/* 数据结构 */
struct my_data {
    int value;
    struct rcu_head rcu;  /* 用于延迟释放 */
};

struct my_data __rcu *global_ptr;  /* __rcu 注解表示 RCU 保护 */

/* 读者 - 完全无锁 */
void reader(void)
{
    struct my_data *ptr;
    
    rcu_read_lock();           /* 进入 RCU 读侧临界区 */
    
    /* 解引用 RCU 指针 */
    ptr = rcu_dereference(global_ptr);
    if (ptr)
        printk("Value: %d\n", ptr->value);
    
    rcu_read_unlock();         /* 退出临界区 */
}

/* 写者 - 复制并更新 */
void updater(void)
{
    struct my_data *new_data;
    struct my_data *old_data;
    
    /* 1. 分配新数据 */
    new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);
    
    /* 2. 复制旧数据并修改 */
    old_data = rcu_dereference_protected(global_ptr, 
                                          lockdep_is_held(&my_mutex));
    *new_data = *old_data;
    new_data->value = 42;
    
    /* 3. 发布新指针(原子操作)*/
    rcu_assign_pointer(global_ptr, new_data);
    
    /* 4. 等待宽限期 */
    synchronize_rcu();         /* 阻塞直到所有旧读者完成 */
    
    /* 5. 安全释放旧数据 */
    kfree(old_data);
}

/* 更高效的延迟释放 */
void updater_kfree_rcu(void)
{
    struct my_data *new_data, *old_data;
    
    new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);
    old_data = rcu_dereference_protected(global_ptr, 1);
    
    *new_data = *old_data;
    new_data->value = 42;
    
    rcu_assign_pointer(global_ptr, new_data);
    
    /* 异步延迟释放,不阻塞 */
    kfree_rcu(old_data, rcu);
}

5.1 RCU 链表

/* RCU 保护的链表操作 */
#include <linux/rculist.h>

struct node {
    int data;
    struct list_head list;
    struct rcu_head rcu;
};

static LIST_HEAD(head);

/* 遍历(读者侧) */
void list_reader(void)
{
    struct node *node;
    
    rcu_read_lock();
    list_for_each_entry_rcu(node, &head, list) {
        printk("Data: %d\n", node->data);
    }
    rcu_read_unlock();
}

/* 添加节点 */
void list_adder(struct node *new_node)
{
    /* 插入到链表(使用 rcu 版本) */
    list_add_rcu(&new_node->list, &head);
}

/* 删除节点 */
void list_remover(struct node *target)
{
    /* 从链表移除 */
    list_del_rcu(&target->list);
    
    /* 等待宽限期后释放 */
    synchronize_rcu();
    kfree(target);
    
    /* 或使用异步释放 */
    /* list_del_rcu(&target->list);
     * kfree_rcu(target, rcu);
     */
}

6. Per-CPU 变量

最高效的同步是不同步。Per-CPU 变量为每个 CPU 维护独立的数据副本,完全避免缓存行竞争。这是实现高性能统计计数器的首选方法。

/* include/linux/percpu.h */
#include <linux/percpu.h>

/* 定义 per-cpu 变量 */
DEFINE_PER_CPU(long, page_fault_count);

/* 动态分配 */
long __percpu *dynamic_counter;
dynamic_counter = alloc_percpu(long);

/* 访问当前 CPU 的变量 */
__this_cpu_inc(page_fault_count);           /* 增加 */
long count = __this_cpu_read(page_fault_count); /* 读取 */

/* 安全的访问(禁用抢占) */
get_cpu_var(page_fault_count)++;
put_cpu_var(page_fault_count);

/* 遍历所有 CPU */
long total = 0;
for_each_possible_cpu(cpu) {
    total += per_cpu(page_fault_count, cpu);
}

/* 释放 */
free_percpu(dynamic_counter);

/* 实际应用:网络统计 */
struct net_stats {
    u64 rx_packets;
    u64 rx_bytes;
    u64 tx_packets;
    u64 tx_bytes;
};

DEFINE_PER_CPU(struct net_stats, netdev_stats);

void netdev_rx(struct sk_buff *skb)
{
    struct net_stats *stats = this_cpu_ptr(&netdev_stats);
    
    u64_stats_update_begin(&stats->syncp);
    stats->rx_packets++;
    stats->rx_bytes += skb->len;
    u64_stats_update_end(&stats->syncp);
}

7. 无锁编程技术

除了 RCU,内核还提供其他无锁编程技术,适用于特定场景。

7.1 内存屏障

/* 内存屏障保证执行顺序 */
/* 编译器屏障 */
barrier();

/* 处理器内存屏障 */
smp_mb();   /* 全内存屏障 */
smp_rmb();  /* 读屏障 */
smp_wmb();  /* 写屏障 */

/* 示例:双缓冲 */
struct buffer *front, *back;

void producer(struct data *d)
{
    /* 写入 back buffer */
    write_data(back, d);
    
    /* 确保数据写入完成才交换指针 */
    smp_wmb();
    
    /* 交换指针 */
    swap(front, back);
}

void consumer(void)
{
    struct data *d = read_data(front);
    
    /* 确保读取在指针交换之后 */
    smp_rmb();
    
    process(d);
}

7.2 比较并交换(CAS)

/* 实现无锁栈 */
struct node {
    void *data;
    struct node *next;
};

struct node __rcu *stack_top;

void push(void *data)
{
    struct node *new = kmalloc(sizeof(*new), GFP_KERNEL);
    struct node *old;
    
    new->data = data;
    
    do {
        old = rcu_dereference_protected(stack_top, 1);
        new->next = old;
        /* CAS: 如果 stack_top == old,则设为 new */
    } while (cmpxchg(&stack_top, old, new) != old);
}

void *pop(void)
{
    struct node *old, *new;
    void *data;
    
    do {
        old = rcu_dereference_protected(stack_top, 1);
        if (!old)
            return NULL;
        new = old->next;
        /* CAS: 如果 stack_top == old,则设为 new */
    } while (cmpxchg(&stack_top, old, new) != old);
    
    data = old->data;
    kfree_rcu(old, rcu);
    return data;
}

8. 小结

内核同步是系统编程中最具挑战性的领域之一。选择合适的同步原语需要综合考虑性能、复杂性和适用场景。本章系统介绍了 Linux 内核的同步机制,从基础的原子操作到复杂的 RCU 机制。

同步原语选择指南: ┌────────────────────────────────────────────────────────────────┐ │ 问题类型 │ 推荐原语 │ ├────────────────────────────────────────────────────────────────┤ │ 简单计数器 │ atomic_t / atomic64_t │ │ 位图操作 │ 原子位操作 │ │ 短临界区、禁止睡眠 │ spinlock_t │ │ 读多写少 │ rwlock_t / seqlock_t │ │ 长临界区、允许睡眠 │ mutex_t │ │ 资源池管理 │ semaphore / completion │ │ 读极多、写极少 │ RCU │ │ 每 CPU 统计 │ per-cpu 变量 │ │ 无锁数据结构 │ CAS + 内存屏障 │ └────────────────────────────────────────────────────────────────┘
表 8.1:同步原语选择指南

关键要点:

  • 原子操作是最基础的同步,适用于简单计数器和标志位
  • 自旋锁适用于短临界区,持有者不能睡眠
  • 互斥锁适用于长临界区,可以睡眠,开销较大
  • RCU是读多写少场景的最佳选择,读者完全无锁
  • Per-CPU 变量避免同步的最佳实践,适合统计计数
同步最佳实践

1. 保持临界区尽可能短
2. 避免在持有锁时调用可能睡眠的函数
3. 按固定顺序获取多个锁,避免死锁
4. 优先使用 RCU 和 per-cpu 变量,减少锁竞争
5. 使用 lockdep 检测潜在的锁问题

掌握这些同步机制是编写高性能、可靠内核代码的基础。在实际开发中,建议使用内核提供的 lockdep 和 KCSAN 等工具检测竞态条件和数据竞争。下一章,我们将探讨 Linux 文件系统实现,理解 VFS 层如何统一各种文件系统。

← 返回文章列表