1. 内核并发概述
Linux 内核是一个高度并发的环境。在多核处理器上,多个线程可以同时执行内核代码;在中断上下文中,硬件随时可能打断当前执行流;抢占机制允许调度器随时切换任务。这些并发源带来了严峻的挑战:竞态条件(Race Condition)、死锁(Deadlock)和优先级反转(Priority Inversion)。
内核提供了丰富的同步原语来应对这些挑战。选择合适的同步机制需要理解其语义、性能特征和适用场景。错误的同步选择可能导致性能瓶颈或难以调试的 bug。
1.1 内核抢占与上下文
理解内核同步首先要理解执行上下文。Linux 内核代码可能在以下上下文运行:
- 进程上下文:执行系统调用或内核线程,可以睡眠(调用 schedule)
- 软中断上下文:执行软中断、tasklet,不能睡眠
- 硬中断上下文:执行中断处理程序,不能睡眠,时间要求严格
不同的同步原语适用于不同的上下文。例如,互斥锁只能在进程上下文使用,而自旋锁可以在中断上下文使用(需小心)。
2. 原子操作
原子操作是最基础的同步原语,保证操作不可分割。现代处理器提供原子指令(如 x86 的 LOCK 前缀、ARM 的 LDREX/STREX),内核将其封装为统一的 API。
2.1 整数原子操作
/* include/linux/atomic.h */
#include <linux/atomic.h>
atomic_t counter = ATOMIC_INIT(0);
/* 基本操作 */
atomic_set(&counter, 5); /* 设置值 */
int val = atomic_read(&counter); /* 读取值 */
/* 算术操作(返回新值或旧值) */
atomic_inc(&counter); /* 递增 */
atomic_dec(&counter); /* 递减 */
atomic_add(10, &counter); /* 加 10 */
atomic_sub(5, &counter); /* 减 5 */
/* 带返回值的变体 */
int new = atomic_inc_return(&counter); /* 返回递增后的值 */
int old = atomic_fetch_add(5, &counter); /* 返回加 5 前的值 */
/* 条件操作 */
int ret = atomic_add_unless(&counter, 1, 100); /* 除非 counter==100,否则加 1 */
bool swapped = atomic_cmpxchg(&counter, 5, 10); /* CAS: 如果==5则设为10 */
/* 64位版本 */
atomic64_t big_counter = ATOMIC64_INIT(0);
atomic64_add(10000000000LL, &big_counter);
2.2 位操作
/* 原子位操作作用于普通内存地址 */
unsigned long flags = 0;
set_bit(0, &flags); /* 设置第 0 位 */
clear_bit(1, &flags); /* 清除第 1 位 */
change_bit(2, &flags); /* 翻转第 2 位 */
/* 测试并操作 */
if (test_and_set_bit(3, &flags)) /* 测试并设置 */
printk("Bit 3 was already set\n");
/* 查找操作 */
int first = find_first_bit(&flags, 64); /* 找第一个置位 */
int zero = find_first_zero_bit(&flags, 64); /* 找第一个清零位 */
/* 非原子位操作(带 __ 前缀)用于已受保护的区域 */
__set_bit(4, &flags); /* 仅在持有锁时使用 */
Linux 原子操作默认提供顺序一致性。对于更精细的控制,内核提供 smp_mb()(内存屏障)、smp_rmb()(读屏障)、smp_wmb()(写屏障)等原语。现代代码推荐使用显式内存序的原子操作。
3. 自旋锁
自旋锁(Spinlock)是最常用的内核锁。当一个 CPU 尝试获取已被持有的锁时,它会"自旋"——忙等待直到锁可用。自旋锁适用于保护临界区很短的情况,因为等待期间占用 CPU。
3.1 基本用法
/* include/linux/spinlock.h */
#include <linux/spinlock.h>
/* 定义并初始化 */
static DEFINE_SPINLOCK(my_lock);
/* 或使用动态初始化 */
spinlock_t my_lock;
spin_lock_init(&my_lock);
/* 基本使用 */
spin_lock(&my_lock); /* 获取锁 */
/* 临界区:访问共享数据 */
shared_data.counter++;
spin_unlock(&my_lock); /* 释放锁 */
/* 中断安全的变体 */
unsigned long flags;
spin_lock_irqsave(&my_lock, flags); /* 保存中断状态并禁用中断 */
/* 临界区 */
spin_unlock_irqrestore(&my_lock, flags); /* 恢复中断状态 */
3.2 自旋锁的实现原理
/* 简化的自旋锁实现( ticket spinlock ) */
typedef struct {
union {
u32 slock;
struct {
u16 owner;
u16 next;
};
};
} arch_spinlock_t;
/* 加锁 */
static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
u32 val = 0;
/*
* xadd: 原子交换并相加
* 返回旧值,next = 旧值 + 1
*/
val = atomic_fetch_add(1 << TICKET_SHIFT, &lock->slock);
u16 ticket = val >> TICKET_SHIFT; /* 我的票号 */
u16 owner = val & TICKET_MASK; /* 当前服务号 */
/* 等待叫号 */
if (ticket != owner) {
/* 自旋等待 */
do {
cpu_relax(); /* 提示 CPU 这是自旋循环 */
owner = READ_ONCE(lock->owner);
} while (ticket != owner);
}
}
/* 解锁 */
static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
/* 叫下一个号 */
smp_store_release(&lock->owner, lock->owner + 1);
}
3.3 读写自旋锁
/* 读写锁允许多个读者或单个写者 */
static DEFINE_RWLOCK(my_rwlock);
/* 读者 */
read_lock(&my_rwlock);
/* 只读访问共享数据 */
val = shared_data.counter;
read_unlock(&my_rwlock);
/* 写者 */
write_lock(&my_rwlock);
/* 独占访问,修改数据 */
shared_data.counter++;
write_unlock(&my_rwlock);
/* 读写锁变体:seqlock */
/* 适合读者极多、写者极少的情况 */
static DEFINE_SEQLOCK(my_seqlock);
/* 读者 */
unsigned seq;
do {
seq = read_seqbegin(&my_seqlock);
/* 读取数据 */
val = shared_data.counter;
} while (read_seqretry(&my_seqlock, seq));
/* 写者 */
write_seqlock(&my_seqlock);
shared_data.counter++;
write_sequnlock(&my_seqlock);
4. 互斥锁与信号量
当临界区可能较长或需要睡眠时,自旋锁不再适用(持有自旋锁时禁止睡眠)。互斥锁(Mutex)和信号量(Semaphore)允许获取者在无法立即获得锁时睡眠,将 CPU 让给其他任务。
4.1 互斥锁
/* include/linux/mutex.h */
#include <linux/mutex.h>
static DEFINE_MUTEX(my_mutex);
/* 获取互斥锁(可能睡眠) */
mutex_lock(&my_mutex);
/* 临界区 - 可以安全睡眠 */
schedule_timeout(HZ); /* 睡眠 1 秒 */
mutex_unlock(&my_mutex);
/* 非阻塞尝试 */
if (mutex_trylock(&my_mutex)) {
/* 获得锁 */
mutex_unlock(&my_mutex);
} else {
printk("Lock is busy\n");
}
/* 带超时的获取 */
if (mutex_lock_interruptible(&my_mutex) == 0) {
/* 获得锁(或被信号中断)*/
mutex_unlock(&my_mutex);
}
/* 互斥锁限制:
* - 同一时间只有一个持有者
* - 必须由持有者释放
* - 不能在中断上下文使用
* - 递归上锁会导致死锁
*/
4.2 信号量
/* include/linux/semaphore.h */
#include <linux/semaphore.h>
/* 二值信号量(类似互斥锁) */
static DEFINE_SEMAPHORE(my_sem);
down(&my_sem); /* P 操作 - 获取 */
/* 临界区 */
up(&my_sem); /* V 操作 - 释放 */
/* 计数信号量(允许多个持有者) */
struct semaphore sem;
sema_init(&sem, 5); /* 最多 5 个并发 */
down(&sem); /* 计数 -1,为 0 时阻塞 */
/* 执行操作 */
up(&sem); /* 计数 +1 */
/* 非阻塞变体 */
if (down_trylock(&sem) == 0) {
/* 获得 */
up(&sem);
}
/* 可中断变体 */
if (down_interruptible(&sem) == 0) {
/* 获得 */
up(&sem);
}
/* 超时变体 */
if (down_timeout(&sem, HZ * 5) == 0) {
/* 5 秒内获得 */
up(&sem);
}
4.3 完成量
/* 完成量用于等待某个事件完成 */
#include <linux/completion.h>
static DECLARE_COMPLETION(my_completion);
/* 等待者 */
wait_for_completion(&my_completion); /* 阻塞等待 */
/* 唤醒者 */
complete(&my_completion); /* 唤醒一个等待者 */
complete_all(&my_completion); /* 唤醒所有等待者 */
/* 典型场景:等待异步操作完成 */
void async_worker(struct work_struct *work)
{
/* 执行耗时操作 */
do_work();
/* 通知完成 */
complete(&my_completion);
}
/* 发起者 */
schedule_work(&my_work);
wait_for_completion(&my_completion); /* 等待工作完成 */
5. RCU 机制
读-复制-更新(Read-Copy-Update)是 Linux 内核最具创新的同步机制。RCU 允许多个读者与写者并发执行,读者完全无锁,写者通过复制和延迟释放实现更新。RCU 适用于读多写少的数据结构。
/* include/linux/rcupdate.h */
#include <linux/rcupdate.h>
#include <linux/rculist.h>
/* 数据结构 */
struct my_data {
int value;
struct rcu_head rcu; /* 用于延迟释放 */
};
struct my_data __rcu *global_ptr; /* __rcu 注解表示 RCU 保护 */
/* 读者 - 完全无锁 */
void reader(void)
{
struct my_data *ptr;
rcu_read_lock(); /* 进入 RCU 读侧临界区 */
/* 解引用 RCU 指针 */
ptr = rcu_dereference(global_ptr);
if (ptr)
printk("Value: %d\n", ptr->value);
rcu_read_unlock(); /* 退出临界区 */
}
/* 写者 - 复制并更新 */
void updater(void)
{
struct my_data *new_data;
struct my_data *old_data;
/* 1. 分配新数据 */
new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);
/* 2. 复制旧数据并修改 */
old_data = rcu_dereference_protected(global_ptr,
lockdep_is_held(&my_mutex));
*new_data = *old_data;
new_data->value = 42;
/* 3. 发布新指针(原子操作)*/
rcu_assign_pointer(global_ptr, new_data);
/* 4. 等待宽限期 */
synchronize_rcu(); /* 阻塞直到所有旧读者完成 */
/* 5. 安全释放旧数据 */
kfree(old_data);
}
/* 更高效的延迟释放 */
void updater_kfree_rcu(void)
{
struct my_data *new_data, *old_data;
new_data = kmalloc(sizeof(*new_data), GFP_KERNEL);
old_data = rcu_dereference_protected(global_ptr, 1);
*new_data = *old_data;
new_data->value = 42;
rcu_assign_pointer(global_ptr, new_data);
/* 异步延迟释放,不阻塞 */
kfree_rcu(old_data, rcu);
}
5.1 RCU 链表
/* RCU 保护的链表操作 */
#include <linux/rculist.h>
struct node {
int data;
struct list_head list;
struct rcu_head rcu;
};
static LIST_HEAD(head);
/* 遍历(读者侧) */
void list_reader(void)
{
struct node *node;
rcu_read_lock();
list_for_each_entry_rcu(node, &head, list) {
printk("Data: %d\n", node->data);
}
rcu_read_unlock();
}
/* 添加节点 */
void list_adder(struct node *new_node)
{
/* 插入到链表(使用 rcu 版本) */
list_add_rcu(&new_node->list, &head);
}
/* 删除节点 */
void list_remover(struct node *target)
{
/* 从链表移除 */
list_del_rcu(&target->list);
/* 等待宽限期后释放 */
synchronize_rcu();
kfree(target);
/* 或使用异步释放 */
/* list_del_rcu(&target->list);
* kfree_rcu(target, rcu);
*/
}
6. Per-CPU 变量
最高效的同步是不同步。Per-CPU 变量为每个 CPU 维护独立的数据副本,完全避免缓存行竞争。这是实现高性能统计计数器的首选方法。
/* include/linux/percpu.h */
#include <linux/percpu.h>
/* 定义 per-cpu 变量 */
DEFINE_PER_CPU(long, page_fault_count);
/* 动态分配 */
long __percpu *dynamic_counter;
dynamic_counter = alloc_percpu(long);
/* 访问当前 CPU 的变量 */
__this_cpu_inc(page_fault_count); /* 增加 */
long count = __this_cpu_read(page_fault_count); /* 读取 */
/* 安全的访问(禁用抢占) */
get_cpu_var(page_fault_count)++;
put_cpu_var(page_fault_count);
/* 遍历所有 CPU */
long total = 0;
for_each_possible_cpu(cpu) {
total += per_cpu(page_fault_count, cpu);
}
/* 释放 */
free_percpu(dynamic_counter);
/* 实际应用:网络统计 */
struct net_stats {
u64 rx_packets;
u64 rx_bytes;
u64 tx_packets;
u64 tx_bytes;
};
DEFINE_PER_CPU(struct net_stats, netdev_stats);
void netdev_rx(struct sk_buff *skb)
{
struct net_stats *stats = this_cpu_ptr(&netdev_stats);
u64_stats_update_begin(&stats->syncp);
stats->rx_packets++;
stats->rx_bytes += skb->len;
u64_stats_update_end(&stats->syncp);
}
7. 无锁编程技术
除了 RCU,内核还提供其他无锁编程技术,适用于特定场景。
7.1 内存屏障
/* 内存屏障保证执行顺序 */
/* 编译器屏障 */
barrier();
/* 处理器内存屏障 */
smp_mb(); /* 全内存屏障 */
smp_rmb(); /* 读屏障 */
smp_wmb(); /* 写屏障 */
/* 示例:双缓冲 */
struct buffer *front, *back;
void producer(struct data *d)
{
/* 写入 back buffer */
write_data(back, d);
/* 确保数据写入完成才交换指针 */
smp_wmb();
/* 交换指针 */
swap(front, back);
}
void consumer(void)
{
struct data *d = read_data(front);
/* 确保读取在指针交换之后 */
smp_rmb();
process(d);
}
7.2 比较并交换(CAS)
/* 实现无锁栈 */
struct node {
void *data;
struct node *next;
};
struct node __rcu *stack_top;
void push(void *data)
{
struct node *new = kmalloc(sizeof(*new), GFP_KERNEL);
struct node *old;
new->data = data;
do {
old = rcu_dereference_protected(stack_top, 1);
new->next = old;
/* CAS: 如果 stack_top == old,则设为 new */
} while (cmpxchg(&stack_top, old, new) != old);
}
void *pop(void)
{
struct node *old, *new;
void *data;
do {
old = rcu_dereference_protected(stack_top, 1);
if (!old)
return NULL;
new = old->next;
/* CAS: 如果 stack_top == old,则设为 new */
} while (cmpxchg(&stack_top, old, new) != old);
data = old->data;
kfree_rcu(old, rcu);
return data;
}
8. 小结
内核同步是系统编程中最具挑战性的领域之一。选择合适的同步原语需要综合考虑性能、复杂性和适用场景。本章系统介绍了 Linux 内核的同步机制,从基础的原子操作到复杂的 RCU 机制。
关键要点:
- 原子操作是最基础的同步,适用于简单计数器和标志位
- 自旋锁适用于短临界区,持有者不能睡眠
- 互斥锁适用于长临界区,可以睡眠,开销较大
- RCU是读多写少场景的最佳选择,读者完全无锁
- Per-CPU 变量避免同步的最佳实践,适合统计计数
1. 保持临界区尽可能短
2. 避免在持有锁时调用可能睡眠的函数
3. 按固定顺序获取多个锁,避免死锁
4. 优先使用 RCU 和 per-cpu 变量,减少锁竞争
5. 使用 lockdep 检测潜在的锁问题
掌握这些同步机制是编写高性能、可靠内核代码的基础。在实际开发中,建议使用内核提供的 lockdep 和 KCSAN 等工具检测竞态条件和数据竞争。下一章,我们将探讨 Linux 文件系统实现,理解 VFS 层如何统一各种文件系统。