2024年3月9日发(作者:)
Kernel Locking 中文版
Unreliable Guide To Locking
Rusty Russell
翻译:
albcamus
Copyright © 2003 Rusty Russell
This documentation is free software; you can redistribute it
and/or modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public
License along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
USA
For more details see the file COPYING in the source
distribution of Linux.
第1章. 介绍
欢迎进入Rusty优秀的《Unreliable Guide to Kernel Locking》细节。本文档描述
了Linux 2.6内核的锁系统。
随着Linux内核引入了超线程与 抢占 ,每一个Hacking内核的人都应该了解并发性
与为
SMP
加锁的基础知识。
第2章. 并发带来的问题
(如果你已知道并发是什么,请略过本章).
在一个普通的程序里,你可以这样为一个计数器增加计数:
very_important_count++;
下面是你期望的结果:
Table 2-1. 期望的结果
实例 1
读 very_important_count (5)
加1 (6)
写回 very_important_count (6)
事情还可能这样发生:
Table 2-2. 可能的结果
实例 1
读 very_important_count (5)
加 1 (6)
写回 very_important_count (6)
实例 2
读 very_important_count (6)
加 1 (7)
写回 very_important_count (7)
实例 2
读 very_important_count (5)
加 1 (6)
写回 very_important_count (6)
2.1. 竞态条件与临界区
这种情况,结果依赖于多个任务的相对执行顺序,叫做竞态条件。包含并发问题的代
码,叫做临界区。尤其是Linux开始支持SMP机器以来,竞态条件和临界区就成为内核设计
与实现的主要问题。
抢占具有相同的影响,即使只有一个CPU也是这样的:抢占了正在临界区中执行的任
务,我们就重现了上面描述的情况。在这种情况下,抢占了其他代码的线程必须在它的临界
区独自运行(排斥其它线程)。
解决的方法是:认识到何时会发生同时访问,使用锁来保证在任意时刻只有一个实例
能够进入临界区。在Linux内核中有很多友好的原语(primitives)来帮助你做到这点──也
有不友好的原语,但我将当作它们并不存在。
第3章. 在Linux内核中加锁
我多么希望能给你一句忠告:不要与比你更不理性的人同眠;而如果是关于锁的忠告,
我会这样给出:保持简单。
不要试图引入新类型的锁。
真是够奇怪的,最后一条竟然恰恰与我的忠告相反──因为这时你已经与不理性的人
同眠了,你需要考虑弄只看门狗。(译者案:Watchdog是一种以NMI方式检查CPU死锁的机
制)
3.1. 两种主要类型的锁:自旋锁与信号量
内核中主要有两种类型的锁。最基本的类型是自旋锁(include/asm/spinlock.h),它
只允许一个持有者;如果你获取自旋锁时不成功,你将一直在自旋中尝试获得它,直到成功。
自旋锁又小又快,可以在任何地方使用。
第二种类型是信号量(include/asm/spinlock.h),它可以在任何时刻被多个持有者同
时持有(允许的持有者的数量是在信号量初试化时规定的),但是在通常情况下信号量只允许
一个持有者(这时它也叫互斥锁,mutex)。如果你获取信号量时不成功,任务就会把自身放
到一个队列中睡眠,一直到信号量被释放时才被唤醒。这意味着在你等待的时候,CPU将做
其他的事情。但是,很多情况是不允许睡眠的(参考 第 9 ),这时你应该用自旋锁而不是章
信号量。
这两种锁的任何一种都是不允许递归的:参考 7.1 小节 .
3.2. 单CPU内核与锁
在单CPU机器上,对于编译时打开了CONFIG_SMP、但没打开CONFIG_PREEMPT的内核
来说,自旋锁根本不存在。这是一个出色的设计策略:既然没有别人能在同时刻执行,就没
理由加锁。
如果编译时同时打开了CONFIG_SMP和CONFIG_PREEMPT,自旋锁的作用就仅仅是禁止
抢占,这就足以防止任何竞态了。多数情况下,我们可以把抢占等同于SMP来考虑其可能带
来的竞态问题,不必单独对付它。
当测试加锁代码时,即使你没有SMP测试环境,总应该打开CONFIG_SMP和
CONFIG_PREEMPT选项,因为这会重现关于锁的某些类型的BUG。
信号量依然存在,因为它们之所以被引入,乃是为了同步用户上下文。我们将马上看
到这一点。
3.3. 只在用户上下文加锁
如果你的数据结构只可能被用户上下文访问,你可以用信号量
(linux/asm/semaphore.h)来保护它。这是最简单的情况了:把信号量初试化为可用资源的
数量(通常是1),就可以调用down_interruptible()来获取信号量,调用up()来释放它。还
有一个down()函数,但应该避免使用它,因为如果有信号到来它不会返回。
例如:linux/net/core/netfilter.c允许用nf_register_sockopt()注册新的
setsockopt()和getsockopt()调用。注册和注销只有在模块加载与卸载时才会发生(是在引
导时执行的,没有并发问题),注册了的链表只有setsockopt()或getsockopt()系统调用才
会查阅。因此,nf_sockopt_mutex 就可以完美的保护住这些,这是因为setsockopt和
getsockopt允许睡眠。
3.4. 在用户上下文与Softirqs之间加锁
如果一个
softirq
与用户上下文共享数据,就有两个问题:首先,当前的用户上下文
可能被softirq中断;其次,临界区可能会在别的CPU进入。这时spin_lock_bh()
(include/linux/spinlock.h)就有了用武之地。它会在那个CPU上禁止softirqs,然后获
取锁。spin_unlock_bh()做相反的工作。(由于历史原因,后缀‘bh’成为对各种下半部的
通称,后者是software interrupts的旧称。其实spin_lock_bh本应叫作
spin_lock_softirq才贴切)
注意这里你也可以用spin_lock_irq()或者spin_lock_irqsave(),这样不单会禁止
softirqs,还会禁止硬件中断:参考第 4 。章
这在
UP
上也能很好地工作:自旋锁消失了,该宏变成了只是local_bh_disable()
(include/linux/interrupt.h),它会禁止softirqs运行。
3.5. 在用户上下文与Tasklets之间加锁
这种情况与上面的情况(译者案,上面“在用户上下文与softirqs之间加锁”的情况)
完全一致,因为t
asklets
本来就是作为一种softirq运行的。
3.6. 在用户上下文与Timers之间加锁
这种情况也和上面情况完全一致,因为
timers
本来就是一种softirq(译者案:Timer
是时钟中断的下半部)。从加锁观点来看,tasklets和timers的地位是等同的。
3.7. 在Tasklets/Timers之间加锁
有时一个tasklet或timer会与另一个tasklet或timer共享数据。
3.7.1. 同一个Tasklet/Timer
由于同一时刻,一个tasklet决不会在两个CPU上执行,即使在SMP机器上,你也不
必担心你的tasklet会发生重入问题(同时运行了两个实例)
3.7.2. 不同的Tasklets/Timers
如果你的tasklet或timer想要同另一个tasklet或timer共享数据,你需要对它们
二者都使用spin_lock()和spin_unlock()。没必要使用spin_lock_bh(),因为你已经处在
tasklet中了,而同一个CPU不会同时再执行其他的tasklet。
3.8. 在Softirqs之间加锁
一个softirq经常需要与自己或其它的tasklet/timer共享数据。
3.8.1. 同一个Softirq
同一个softirq可能在别的CPU上执行:你可以使用per-CPU数据(参考 8.3 小节 )
以获得更好的性能。如果你打算这样使用softirq,通常是为了获得可伸缩的高性能而带来
额外的复杂性。
你需要用spin_lock()和spin_unlock()保护共享数据。
3.8.2. 不同的 Softirqs
你需要使用spin_lock()和spin_unlock()保护共享数据,不管是timer,tasklet,
还是不同的/同一个/另一个的softirq:它们中的任何一个都可以在另一个CPU上运行。
第4章. 硬中断上下文
硬中断通常与一个tasklet或softirq通信。这通常涉及到把一个任务放到某个队列
中,再由softirq取出来。
4.1. 在硬中断与Softirqs/Tasklets之间加锁
如果一个硬件中断服务例程与一个softirq共享数据,就有两点需要考虑。第一,
softirq的执行过程可能会被硬件中断打断;第二,临界区可能会被另一个CPU上的硬件中
断进入。这正是spin_lock_irq()派上用场的地方。它在那个CPU上禁止中断,然后获取锁。
spin_unlock_irq()做相反的工作。
硬件中断服务例程中不需要使用spin_lock_irq(),因为当它在执行的时候softirq
是不可能执行的:它可以使用spin_lock(),这个会更快一些。唯一的例外是另一个硬件中
断服务例程使用了相同的锁:spin_lock_irq()会禁止那个硬件中断。
这在UP机器上也能很好的工作:自旋锁消失了,spin_lock_irq()变成了仅仅是
local_irq_disable() (include/asm/smp.h),这将会禁止sofirq/tasklet/BH的运行。
spin_lock_irqsave()(include/linux/spinlock.h)是spin_lock_irq()的变体,
它把当前的中断开启与否的状态保存在一个状态字中,用以将来传递给
spin_unlock_restore()。这意味着同样的代码既可以在硬件中断服务例程中(这时中断已
关闭)使用,也可以在softirqs中(这时需要主动禁止中断)使用。
注意,softirqs(包括tasklets和timers)是在硬件中断返回时得到运行的,因此
spin_lock_irq()同样也会禁止掉它们。从这个意义上说,spin_lock_irqsave()是最通用和
最强大的加锁函数。
4.2. 在两个硬中断服务例程之间加锁
很少有两个硬件中断服务例程共享数据的情况。如果你真的需要这样做,可以使用
spin_lock_irqsave() :在进入中断服务时是否自动关闭中断,这件事是体系结构相关的。
第5章. 关于锁的使用的图表
Pete Zaitcev 提供了这样的总结:
•
如果你处在进程上下文(任何系统调用),想把其他进程排挤出临界区,使用信号量。
你可以获得信号量之后去睡眠(例如调用copy_from_user或kmalloc(x,
GFP_KERNEL)之类的函数)。
•
否则(亦即:数据可能被中断访问),使用spin_lock_irqsave()和
spin_lock_irqrestore()。
•
避免任何持有自旋锁超过5行代码的情况,避免任何跨越函数调用的只有自旋锁的情
况。(有种情况例外,比方象readb这样的原子访问)
5.1. 加锁最低要求表
下面的表列出了在不同上下文中加锁时的最低要求。在一些情况下,同一个上下文在
给定时刻只能在一个CPU上执行,因此不需要锁(例如,某个线程在给定时刻只可能在一个
CPU上运行,但如果它需要跟另一个线程共享数据,就需要锁了)
记住上面的建议:你可以总是只用spin_lock_irqsave(),它是其它加锁原语的超集。
表 5-1. 加锁的要求
硬件中断硬件中
Softirq Softirq Tasklet Tasklet
服务例程 断服务Timer A
ABAB
A例程 B
用户
Timer B上下
文 A
用户上
下文 B
硬件中断
服务例程 None
A
硬件中断spin_loc
服务例程 k_irqsavNone
Be()
spin_l
Softirq spin_locspin_lo
ock_ir
Ak_irq()ck()
q()
spin_l
Softirq spin_locspin_lospin_lo
ock_ir
Bk_irq()ck()ck()
q()
spin_l
Tasklet spin_locspin_lospin_lo
ock_irNone
Ak_irq()ck()ck()
q()
spin_l
Tasklet spin_locspin_lospin_lospin_loc
ock_irNone
Bk_irq()ck()ck()k()
q()
spin_l
spin_locspin_lospin_lospin_locspin_loc
Timer Aock_irNone
k_irq()ck()ck()k()k()
q()
spin_l
spin_locspin_lospin_lospin_locspin_locspin_loc
Timer Bock_irNone
k_irq()ck()ck()k()k()k()
q()
spin_l
用户上下spin_locspin_lospin_lospin_locspin_locspin_locspin_lo
ock_irNone
文 Ak_irq()ck_bh()ck_bh()k_bh()k_bh()k_bh()ck_bh()
q()
spin_l
用户上下spin_locspin_lospin_lospin_locspin_locspin_loc
ock_ir
文 Bk_irq()ck_bh()ck_bh()k_bh()k_bh()k_bh()
q()
第6章. 常见的例子
让我们一步步看一下一个简单的例子:一个对映射号的缓存(译者案:原文“a cache
of number to name mappings”,虚存子系统不熟,恐怕翻译不确)。该缓存保存了对象的
使用有多频繁这个数据,当它满了,抛出最少使用的那个。
6.1. 都在用户上下文
在第一个例子中,我们假定所有的操作都发生在用户上下文(也就是,都在系统调用
中),所以允许睡眠。这意味着我们可以使用信号量来保护cache和其中的所有对象。下面
是代码:
#include
#include
#include
#include
#include
struct object
{
struct list_head list;
int id;
char name[32];
int popularity;
};
/* 保护缓存、缓存号和其中的对象 */
static DECLARE_MUTEX(cache_lock);
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
/* 必持有cache_lock信号量 */
static struct object *__cache_find(int id)
{
struct object *i;
list_for_each_entry(i, &cache, list)
if (i->id == id) {
i->popularity++;
return i;
}
return NULL;
}
/* 必须持有cache_lock信号量 */
static void __cache_delete(struct object *obj)
{
BUG_ON(!obj);
list_del(&obj->list);
kfree(obj);
cache_num--;
}
/* 必须持有cache_lock信号量 */
static void __cache_add(struct object *obj)
{
list_add(&obj->list, &cache);
if (++cache_num > MAX_CACHE_SIZE) {
struct object *i, *outcast = NULL;
list_for_each_entry(i, &cache, list) {
if (!outcast || i->popularity < outcast-
>popularity)
outcast = i;
}
__cache_delete(outcast);
}
}
/* 对上面函数的调用,这才是信号量发挥作用的地方──译注 */
int cache_add(int id, const char *name)
{
struct object *obj;
if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
return -ENOMEM;
strlcpy(obj->name, name, sizeof(obj->name));
obj->id = id;
obj->popularity = 0;
down(&cache_lock);
__cache_add(obj);
up(&cache_lock);
return 0;
}
void cache_delete(int id)
{
down(&cache_lock);
__cache_delete(__cache_find(id));
up(&cache_lock);
}
int cache_find(int id, char *name)
{
struct object *obj;
int ret = -ENOENT;
down(&cache_lock);
obj = __cache_find(id);
if (obj) {
ret = 0;
strcpy(name, obj->name);
}
up(&cache_lock);
return ret;
}
注意我们总是保证在持有cache_lock信号量的情况下对缓存添加、删除和查找操作:
缓存结构自身与其中的对象都被该信号量保护起来。这种情况很简单,因为我们为用户拷贝
数据,而不允许用户直接访问对象。
这里有一个轻量(而且很常见)的优化:在cache_add中,我们先设置对象的各个域,
然后再获取锁。这是安全的,因为没有人能够在我们把对象加入到cache之前就访问它。
6.2. 从中断上下文中访问
现在我们来考虑一下cache_find会在中断上下文中被调用的情况:这个“中断上下
文”或者是硬件中断,或者是softirq。我们使用一个timer来从cache中删除对象。
改写了的代码在下面,用标准的补丁形式给出:以-开始的行是被删除了的,以+开始
的行是新添加了的。
--- ntext 2003-12-09 13:58:54.000000000 +1100
+++ upt 2003-12-09 14:07:49.000000000 +1100
@@ -12,7 +12,7 @@
int popularity;
};
-static DECLARE_MUTEX(cache_lock);
+static spinlock_t cache_lock = SPIN_LOCK_UNLOCKED;
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
@@ -55,6 +55,7 @@
int cache_add(int id, const char *name)
{
struct object *obj;
+ unsigned long flags;
if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
return -ENOMEM;
@@ -63,30 +64,33 @@
obj->id = id;
obj->popularity = 0;
- down(&cache_lock);
+ spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
- up(&cache_lock);
+ spin_unlock_irqrestore(&cache_lock, flags);
return 0;
}
void cache_delete(int id)
{
- down(&cache_lock);
+ unsigned long flags;
+
+ spin_lock_irqsave(&cache_lock, flags);
__cache_delete(__cache_find(id));
- up(&cache_lock);
+ spin_unlock_irqrestore(&cache_lock, flags);
}
int cache_find(int id, char *name)
{
struct object *obj;
int ret = -ENOENT;
+ unsigned long flags;
- down(&cache_lock);
+ spin_lock_irqsave(&cache_lock, flags);
obj = __cache_find(id);
if (obj) {
ret = 0;
strcpy(name, obj->name);
}
- up(&cache_lock);
+ spin_unlock_irqrestore(&cache_lock, flags);
return ret;
}
注意一下,如果中断原来是开启着的,spin_lock_irqsave会关掉它们;否则(我们
已经处在中断服务例程中)就什么也不做。这些函数可以安全地在任何上下文中调用。
不幸的是,cache_add调用了带有GFP_KERNEL标志的kmalloc,这只是在用户上下文
才是合法的。我假定cache_add仍然只会在用户上下文中被调用,否则就应该给cache_add
添加参数了。(译注:我觉得作者的意思是说,如果打算让cache_add能在中断和用户两种
上下文中使用,应该添加一个参数来表达究竟是在哪种上下文中调用的。FIXME)
6.3. 把对象暴露在文件之外
如果对象包含更多的信息,仅仅提供拷贝函数是不够的:其它代码可能会需要保持一
个指向对象的指针,例如,不想每次访问它都要先查找。这就产生了两个问题。
第一个问题是,我们采用cache_lock来保护对象,我们必须把这些函数定义为非
static的,这样其它文件中的代码才能使用。这使得锁的使用更加技巧化,再也不是在同
一地方了。
第二个问题是生命期问题。如果其它结构保留了一个指向我们的对象的指针,它多半
期望该指针总是有效的。很不幸,这只在你持有锁时才会得到保证,否则其它人可能调用
cache_delete来删除对象──还可能更糟糕,在删除之后又添加了另一个对象,还在原来
的地址上。
但是由于只有一把锁,你也不能总持有它,否则其它人就完全无法工作了。
该问题的解决是使用引用计数:每个持有指向对象的指针的人,在他们第一次获得该
指针时,递增一次引用计数;当他们使用完毕,递减一次引用计数。谁把引用计数减到了零,
就知道可以删除了,于是执行删除操作。
下面是代码:
--- upt 2003-12-09 14:25:43.000000000 +1100
+++ 2003-12-09 14:33:05.000000000 +1100
@@ -7,6 +7,7 @@
struct object
{
struct list_head list;
+ unsigned int refcnt;
int id;
char name[32];
int popularity;
@@ -17,6 +18,35 @@
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
+static void __object_put(struct object *obj)
+{
+ if (--obj->refcnt == 0)
+ kfree(obj);
+}
+
+static void __object_get(struct object *obj)
+{
+ obj->refcnt++;
+}
+
+void object_put(struct object *obj)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&cache_lock, flags);
+ __object_put(obj);
+ spin_unlock_irqrestore(&cache_lock, flags);
+}
+
+void object_get(struct object *obj)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&cache_lock, flags);
+ __object_get(obj);
+ spin_unlock_irqrestore(&cache_lock, flags);
+}
+
/* Must be holding cache_lock */
static struct object *__cache_find(int id)
{
@@ -35,6 +65,7 @@
{
BUG_ON(!obj);
list_del(&obj->list);
+ __object_put(obj);
cache_num--;
}
@@ -63,6 +94,7 @@
strlcpy(obj->name, name, sizeof(obj->name));
obj->id = id;
obj->popularity = 0;
+ obj->refcnt = 1; /* The cache holds a reference */
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
@@ -79,18 +111,15 @@
spin_unlock_irqrestore(&cache_lock, flags);
}
-int cache_find(int id, char *name)
+struct object *cache_find(int id)
{
struct object *obj;
- int ret = -ENOENT;
unsigned long flags;
spin_lock_irqsave(&cache_lock, flags);
obj = __cache_find(id);
- if (obj) {
- ret = 0;
- strcpy(name, obj->name);
- }
+ if (obj)
+ __object_get(obj);
spin_unlock_irqrestore(&cache_lock, flags);
- return ret;
+ return obj;
}
我们把引用计数操作封装进标准的‘get’和‘put’函数中。现在我们可以用
cache_find返回对象的指针了,它具有引用计数功能。这样,持有对象指针的用户就可以
睡眠了(例如,调用copy_to_user把名字拷贝到用户空间。) (译注:最后一句的意思是
说,持有对象指针的用户不必象以前那样为了保证指针的合法性就一直持有锁了。引用计数
本身跟锁以及是否可以睡眠一点关系都没有)
另一点要注意的是,我说“每个指向对象的指针都应该有一个引用计数”──因此当
对象刚被插入到cache中时,其引用计数应当为1。 有些不同版本的实现并不使用引用计数,
但它们更加复杂。
6.3.1. 为引用计数使用原子操作
实践中,refcnt的类型应该是atomic_t。在include/asm/atomic.h中定义了几个原
子操作:这些操作保证了从系统的所有CPU的角度看,都是原子的。因此不需要锁。这种情
况下,原子操作要比自旋锁简单的多,尽管复杂情况下使用自旋锁要来得清晰。使用
atomic_inc和atomic_dec_and_test来取代标准的加减操作符,再也不用为引用计数上锁了。
--- 2003-12-09 15:00:35.000000000 +1100
+++ -atomic 2003-12-11 15:49:42.000000000 +1100
@@ -7,7 +7,7 @@
struct object
{
struct list_head list;
- unsigned int refcnt;
+ atomic_t refcnt;
int id;
char name[32];
int popularity;
@@ -18,33 +18,15 @@
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
-static void __object_put(struct object *obj)
-{
- if (--obj->refcnt == 0)
- kfree(obj);
-}
-
-static void __object_get(struct object *obj)
-{
- obj->refcnt++;
-}
-
void object_put(struct object *obj)
{
- unsigned long flags;
-
- spin_lock_irqsave(&cache_lock, flags);
- __object_put(obj);
- spin_unlock_irqrestore(&cache_lock, flags);
+ if (atomic_dec_and_test(&obj->refcnt))
+ kfree(obj);
}
void object_get(struct object *obj)
{
- unsigned long flags;
-
- spin_lock_irqsave(&cache_lock, flags);
- __object_get(obj);
- spin_unlock_irqrestore(&cache_lock, flags);
+ atomic_inc(&obj->refcnt);
}
/* Must be holding cache_lock */
@@ -65,7 +47,7 @@
{
BUG_ON(!obj);
list_del(&obj->list);
- __object_put(obj);
+ object_put(obj);
cache_num--;
}
@@ -94,7 +76,7 @@
strlcpy(obj->name, name, sizeof(obj->name));
obj->id = id;
obj->popularity = 0;
- obj->refcnt = 1; /* The cache holds a reference */
+ atomic_set(&obj->refcnt, 1); /* The cache holds a reference */
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
@@ -119,7 +101,7 @@
spin_lock_irqsave(&cache_lock, flags);
obj = __cache_find(id);
if (obj)
- __object_get(obj);
+ object_get(obj);
spin_unlock_irqrestore(&cache_lock, flags);
return obj;
}
6.4. 保护对象自身
在上面的例子中,我们都假定对象(除了引用计数)自从创建之后就不会被更改。如果
我们希望允许改变name(译者案,指struct object中的name域),有三种可能:
•
你可以把cache_lock变成非static的,告诉人们改变name之前先获得锁。
•
你也可以提供一个cache_obj_rename函数,该函数先获得锁,然后为调用者改变
name。你需要告诉每一个需要改变name的人使用这个函数。
•
你也可以用cache_lock只保护cache自身,而使用另一把锁来保护name域。
理论上,你可以把锁的使用细粒度化(fine-grained)到这样的程度:为每个域维持一
把锁。实践中,最通用的变体是:
•
一把用来保护基础设施(infrastructure.本例中是cache链表)和所有对象的锁。至
今为止我们在示例中看到的便是这种情况。
•
一把用来保护基础设施(包括对象结构中的链表指针)的锁,另一把放在对象内部,用
来保护对象的其它域.
•
多把锁来保护基础设施(例如,Hash表的每一个链都用一把锁保护),可能配合使用每
个对象一把的锁。
下面是“每个对象一把的锁”的实现:
--- -atomic 2003-12-11 15:50:54.000000000 +1100
+++ ectlock 2003-12-11 17:15:03.000000000 +1100
@@ -6,11 +6,17 @@
struct object
{
+ /* 下面两个域,由cache_lock来保护 */
struct list_head list;
+ int popularity;
+
atomic_t refcnt;
+
+ /* Doesn't change once created. */
int id;
+
+ spinlock_t lock; /* Protects the name */
char name[32];
- int popularity;
};
static spinlock_t cache_lock = SPIN_LOCK_UNLOCKED;
@@ -77,6 +84,7 @@
obj->id = id;
obj->popularity = 0;
atomic_set(&obj->refcnt, 1); /* cache本身占一个引用数 */
+ spin_lock_init(&obj->lock);
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
注意我的策略是popularity域由cache_lock来保护,而不是每个对象一把的锁。这
是因为它(就象对象内部的struct list_head域一样)在逻辑上是属于基础设施的一部分的。
这样,当在__cache_add查找最少使用的对象时,我就不必去获取每个对象一把的锁了。
这个策略还有一处要注意:id域是不可更改的,这样我就不必在__cache_find()时
去获取每个对象一把的锁来保护它。每个对象一把的锁,在这里,只是被调用者使用来保护
name域的读和写。
另外,注意我在注释里说明了哪些数据由哪些锁来保护。这是非常重要的,因为它描
述了代码运行时的行为,(倘若不加注释)仅仅靠阅读代码是很难获得认识的。这正如Alan
Cox所说,“锁是用来保护数据的,而非代码。”
第7章. 常见问题
7.1. 死锁: 简单与高级
当一段代码试图两次获取同一把自旋锁时,就出现了BUG:它将永远自旋,等待着锁
被释放(自旋锁、读写自旋锁和信号量在linux中皆是非递归的)。这很容易诊断:它并非是
一个类似“呆上五夜来谈清楚毛绒绒的兔子身上有多少根毛的编码问题”(译者案:原文是
stay-up-five-nights-talk-to-fluffy-code-bunnies ,没法翻译啊)之类的问题。
试想一个稍微复杂一点的情况,假设你有一个softirq和用户上下文共享一片区域。
如果你使用了spin_lock()来保护它,很可能用户上下文持有锁时被这个softirq打断,然
后softirq尝试获得该锁──但它永远也不会成功。
这些情形称为死锁。如同上面展示的那样,它甚至会在单CPU机器上发生(尽管不会
在编译时没选上SMP的机器上发生。因为如果编译时CONFIG_SMP=n,自旋锁就消失了。这种
情况下你会造成数据腐败)。
这种死锁容易诊断:在SMP机器上,看门狗(watchdog)timer或者编译时加入
DEBUG_SPINLOCKS (include/linux/spinlock.h)会在发生死锁时立即显示它。
更复杂的问题是一种叫做“致命拥抱”的死锁,它涉及到两把或更多的锁。比方你有
一个Hash表:表中的每一个项是一个自旋锁和一个对象链表。在softirq处理函数中,你
可能会想更改某个对象在Hash表中的位置,从某个位置到另一个:你获得了旧的Hash链表
的自旋锁和新的链表的自旋锁,然后从旧的链表中删除对象,插入到新的链表中。
这里有两个问题。第一,如果你的代码试图把对象移动到相同的链表中,它就会因两
次尝试获取同一把锁而死锁;第二,如果在别的CPU上运行的同一个softirq试图把另一个
对象从你的目的链表移动到你的源链表中,下面的事情就发生了:
Table 7-1. 顺序
CPU 1
获取锁 A -> OK
CPU 2
获取锁 B -> OK
试图获取 B -> spin试图获取 A -> spin
这两个CPU将永远自旋,等待着对方释放自旋锁。它会使系统看起来象崩溃了一样。
7.2. 防备死锁
教科书告诉你说:总是以相同的顺序获取锁,你就不会造成死锁了。而实践则会告诉
你,这种方法不具有扩展性:当我创建一把锁的时候,我对内核的理解还真不足以计算出在
5000种锁的层次中,哪一种合适。
最好的锁是被封装了的:它们不会暴露在头文件中,不会被它所在文件之外的函数持
有。你可以读一下这种代码,来看看为什么它永远不会死锁。因为它持有一把锁时,是决不
会试图去获取另一把的。使用你的代码的人甚至不需要知道你曾经使用了锁。
一个经典问题是当你提供了回调函数或钩子函数:如果你在持有锁的情况下调用它们,
你将冒死锁的危险:简单死锁或致命拥抱(谁能预知回调函数将会干些什么?)。记住,别的
程序员是极度想得到你的(原文:the other programmers are out to get you,实在不懂
如何翻译才好;-( ),所以不要这样做。
7.2.1. 对死锁的防备过当
死锁诚然会带来问题,然而不如数据腐败(data corruption)之甚。试想这样的一段
代码,它获取一把读锁,搜索一个链表,如果没找到想要的数据,就释放掉读锁,然后获取
一把写锁,把对象插入到链表:这样的代码存在竞态问题。
如果你看不出为什么,那就请离我的代码他妈的远点儿。
7.3. 竞态Timers: 一次内核游戏
Timers会造成它们独有的竞态条件。考虑一个对象集合(链表,Hash表等),每个对
象都有一个timer,该timer负责销毁它所属的对象。
如果你打算销毁整个对象集合(例如模块卸载的时候),你可能会这样做:
/* 这段代码太太太太太糟糕了:如果想更糟糕的话,你还可以使用“匈牙利命名法”
*/
spin_lock_bh(&list_lock);
while (list) {
struct foo *next = list->next;
del_timer(&list->timer);
kfree(list);
list = next;
}
spin_unlock_bh(&list_lock);
在SMP机器上,这段代码早晚要导致崩溃。因为一个timer可能在spin_lock_bh()之
前已经gone off了,它将只有在我们调用spin_unlock_bh()之后才成功的获取锁,然后试
图去释放掉响应的元素(而元素已经释放掉了!)
这个问题可以通过检查del_timer()的返回值来预防:如果返回1,说明timer已经被
删除了;如果返回0,说明(在这个例子中)它正在运行。所以我们应当这样:
retry:
spin_lock_bh(&list_lock);
while (list) {
struct foo *next = list->next;
if (!del_timer(&list->timer)) {
/* Give timer a chance to delete this */
spin_unlock_bh(&list_lock);
goto retry;
}
kfree(list);
list = next;
}
spin_unlock_bh(&list_lock);
另一个常见问题是那些自启动的(通过在timer的函数的最后调用add_timer()就会实
现定时器的自启动 )timers的删除操作。这是一个很常见的易导致竞态的情形,你该使用
del_timer_sync() (include/linux/timer.h)来处理这种情况。该函数的返回值是:我们要
删除的timer最终被停止自启动之前,删除动作应该执行的次数。
7.4. 混乱的Sparc处理器
Alan Cox说:“在sparc机器上,中断的禁止/激活动作处在寄存器窗口中”。Andi
Kleen说“当你从另一个函数用标志(译注:flag, 指如spin_lock_irqsave时的第二个参数)
恢复寄存器状态时,就会弄乱所有的寄存器窗口”
所以永远不要把由spin_lock_irqsave()保存的标志传递给另一个函数(除非它被声
明为inline的)。通常没人会这么干,但你能记住这点最好。David Miller在直接方式下无
法做任何事情。(我敢这么说,是因为我有印象他和一位PowverPC维护者在对待折衷点问题
上态度是什么样子的。)
第8章. 加锁速度
在考虑使用锁的代码的速度问题上,主要有三件事需要关注。首先是并发性:当锁被
别人持有时,有多少事情是我们需要等待的。其次是,需要花多少时间来 获取与释放一把
无争议的锁。第三呢,是尽量使用更少的──或曰更灵活的──锁。当然,这里我已经假定
锁的使用非常频繁,否则你都不用担心效率问题的。
并发性依赖于锁被持有多长时间:你持有锁的时间,就应该同确实需要持有它的时间
一样长,不要更长。在上面那个关于cache的例子中,我们在创建对象时是不持有锁的,只
有在已经准备好把它插入到链表中时才去获取锁。
获取锁的时间依赖于加锁操作对于管道线(pipeline stalls)做了多少破坏性工作,
以及本CPU是最后一个试图获取该锁的CPU(亦即:对本CPU来说,锁是否为缓存密集的
(cache-hot))的可能性有多大:在具有很多CPU的机器上,这种可能性迅速降低。考虑一个
700MHz的Intel Pentium III处理器:一条指令大约花费0.7纳秒,一次原子增加操作大约
花费58纳秒,一次缓存密集的的加锁操作大约花费160纳秒,而从其他CPU上的一次缓存
行转换还要额外花费170到360纳秒。(这些数据来自于Paul McKenney的Linux Journal
RCU article )
这两个目标会发生冲突:通过把锁分成不同的的部分,可以减少持有锁的时间(例如
上面例子中“每个对象一把的锁”),但是这增加了获取锁这一操作的数量,因而结果一般
是比只有一把锁的情况要慢一些。这也是推崇加锁简洁性的一个理由。
第三个考虑点放到了下面:的确存在一些方法能够减少锁的使用。
8.1. 读/写锁变体
自旋锁与信号量都具有读/写变体:rwlock_t和struct rw_semaphore。它们把使用
者分成了两类:读者和写者。如果你只需要读数据,你该获取读锁;但如果你需要写数据,
你需要获取写锁。可以有很多使用者同时获取读锁,但写锁却只能有单个使用者持有。
如果你的代码可以简洁的划分出读和写的界限(就象我们上面的cache代码那样),而
且读者通常需要很长时间的持有读锁,使用读写锁会更好。它们要比普通的锁要稍微慢点儿,
所以在实践中rwlock_t通常并不很值得使用。
8.2. 避免锁的使用:RCU
有一种特殊的读/写锁定方法叫做RCU(Read Copy Update),读者可以完全避免使用
锁:由于我们希望我们的cache被读取远比被更新来的频繁(否则的话,还引入cache干什
么?),使用RCU就是一个可选择的优化。
如何移除读锁呢?移除读锁意味着写者可能在存在读者的情况下更新链表。这很简单:
要向链表中添加元素的代码足够小心,我们就可以在它添加的同时读取链表。例如,把
new
元素添加到
list
链表中:
new->next = list->next;
wmb();
list->next = new;
wmb()是一个写内存屏障。它保证了第一个操作(设置新元素的next指针)是完整的,
而且立即为其它CPU所见,这发生在第二个操作(把新元素添加到链表中)之前。这是很重
要的,因为现代CPU和编译器可能会改变指令的执行顺序,除非明确告诉它们不要这样:我
们希望一个读者要么完全看不到新元素的插入,要么能够看到新元素插入之后的next指针
正确指向了链表的剩余部分。
幸运的是,有一个函数专门添加标准的struct list_head链表:list_add_rcu()
(include/linux/list.h)。
从链表中删除元素更简单:假设我们要删除的元素为A,我们让指向A的指针重新指
向A的后继者,读者要么完全看到了它,要么完全忽略了它。
list->next = old->next;
有一个list_del_rcu()函数(include/linux/list.h)来做该工作(而普通版本的删
除操作会毒害(poison)要移除的对象,这是我们不愿看到的)。
读者也一定要小心:有些CPU会观察next指针以便预取下一个元素,但这样的话,
如果next指针恰恰在这时候发生了变化,那么这些CPU预取的(pre-fetched)内容就是错误
的。还好,有一个list_fro_each_entry_rcu()函数(include/linux/list.h)可以帮助你。
当然,写者可以只使用list_for_each_entry() (译者案:这里的“只使用”,是说不用
但是是否需要一个XXX_rcu()这样的函数),因为不可能同时有两个写者。
我们的困难最终是:什么时候我们可以真正的删除元素?记住,一个读者可能正在访
问要被删除的元素:如果我们释放掉这个元素,而让next指针指向新元素,读者将立即访
问到垃圾内存并崩溃。我们必须等待到所有正在遍历链表的读者都结束了遍历,才能真正释
放掉元素。我们用call_rcu()函数注册一个回调函数,当所有读者结束遍历时,它会真正
销毁对象。
但是,RCU如何得知什么时候读者结束遍历了呢?首先,读者总是在
rcu_read_lock()/rcu_read_unlock()之间执行遍历:这会禁止抢占(而且只是禁止抢占,
其它什么都不做),因此读者在遍历链表期间不会睡眠。
RCU一直等待到其他的CPU至少睡眠了一次:因为读者不会在读取过程中睡眠,此时
我们就知道我们等待其结束的那些读者终于都结束了。于是回调函数被调用。真实的RCU代
码要比这里讲述的更优化些,但这里讲的是它的基础。
--- ectlock 2003-12-11 17:15:03.000000000 +1100
+++ te 2003-12-11 17:55:14.000000000 +1100
@@ -1,15 +1,18 @@
#include
#include
#include
+#include
#include
#include
struct object
{
- /* 下面两个域由cache_lock保护 */
+ /* 下面由RCU保护 */
struct list_head list;
int popularity;
+ struct rcu_head rcu;
+
atomic_t refcnt;
/* 一旦创建就不要更改. */
@@ -40,7 +43,7 @@
{
struct object *i;
- list_for_each_entry(i, &cache, list) {
+ list_for_each_entry_rcu(i, &cache, list) {
if (i->id == id) {
i->popularity++;
return i;
@@ -49,19 +52,25 @@
return NULL;
}
+/* 一旦我们知道没有读者了,立即最终丢弃对象. */
+static void cache_delete_rcu(void *arg)
+{
+ object_put(arg);
+}
+
/* 必须在持有cache_lock的情况下调用 */
static void __cache_delete(struct object *obj)
{
BUG_ON(!obj);
- list_del(&obj->list);
- object_put(obj);
+ list_del_rcu(&obj->list);
cache_num--;
+ call_rcu(&obj->rcu, cache_delete_rcu, obj);
}
/* 必须在持有cache_lock的情况下调用 */
static void __cache_add(struct object *obj)
{
- list_add(&obj->list, &cache);
+ list_add_rcu(&obj->list, &cache);
if (++cache_num > MAX_CACHE_SIZE) {
struct object *i, *outcast = NULL;
list_for_each_entry(i, &cache, list) {
@@ -85,6 +94,7 @@
obj->popularity = 0;
atomic_set(&obj->refcnt, 1); /* cache本身占一个引用数 */
spin_lock_init(&obj->lock);
+ INIT_RCU_HEAD(&obj->rcu);
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
@@ -104,12 +114,11 @@
struct object *cache_find(int id)
{
struct object *obj;
- unsigned long flags;
- spin_lock_irqsave(&cache_lock, flags);
+ rcu_read_lock();
obj = __cache_find(id);
if (obj)
object_get(obj);
- spin_unlock_irqrestore(&cache_lock, flags);
+ rcu_read_unlock();
return obj;
}
注意读者会在__cache_find()中改变popularity域的值,但是没有使用锁。我们的
方案应该把它变成atomic_t类型的,但是对本例来说我们不会导致竞态条件:近似的结果
就已经不错了,所以我没做改变。
结果是,cache_find()函数并不需要与其它函数同步,因此它在SMP上几乎跟在UP
上一样快。
此处还存在一个更纵深的可能的优化:想一想我们最初的cache代码,那时没有引用
计数,无论何时调用者想访问对象,就要持有锁。这仍然是可能的:如果你持有锁,就没人
能够删掉对象,因此你就不必读取并写回引用计数了。
由于RCU中的“读锁”会禁止抢占(而且只是禁止抢占,其它什么都不做),调用者
如果调用cache_find()或object_put()之前已经禁止抢占了,就并不需要读取并写回引用
计数:我们可以把__cache_find()变成非static的从而暴露给其它文件,这时候调用者就
可以直接调用这些函数了。
此处的优势是:引用计数并不被写入,对象并不被更改,这在SMP机器上会非常快─
─由于CPU的caching的原因。
8.3. Per-CPU数据
另一种使用相当广泛的避免锁的技术是,为每个CPU使用数据的一份拷贝。例如,如
果你想为一个普通的条件保持一个计数,你可能会使用自旋锁和一个计数器,这简单而漂亮。
如果它表现的太慢了(通常不会,但是如果你的确测试到这种情况发生了),你就该改
变一下策略:为每个CPU保持一个计数器,这样,没有一个计数器需要互斥的锁来保护。参
考DEFINE_PER_CPU(),get_cpu_var()和put_cpu_var() (include/linux/percpu.h)。
Per-CPU计数器的另一个用场是local_t数据类型,和cpu_local_inc()以及其它的
相关函数。这在某些体系结构上是比简单的加锁代码更高效的。(include/asm/local.h)
注意,对计数器来说,根本不存在简单而可靠的方法可以精确的得到它的值而不需要
引入锁。只不过某些情况下这不是问题罢了。
8.4. 中断服务例程通常使用哪些数据
如果数据只是在中断服务例程内部访问,你根本不需要锁:内核本身已经保证了中断
服务例程不会同时在多个CPU上运行。
Manfred Spraul指出,即使数据会偶尔被用户上下文或softirqs/tasklets访问,你
仍然可以这样。中断服务例程自身不需要锁,其它所有的访问者这样做:
spin_lock(&lock);
disable_irq(irq);
...
enable_irq(irq);s
spin_unlock(&lock);
disable_irq()禁止了中断服务例程的运行(如果此刻已经在别的CPU上运行了,那就
等待它的结束)。自旋锁禁止了同时刻其它任何可能的访问。自然,这比单个
spin_lock_irq()调用要慢些,所以只有这种访问很少发生时这种用法才是合理的。
第9章. 中断里调用哪些函数是安全的?
内核中许多函数会导致睡眠(亦即,直接或间接地调用了scheduler()):当持有锁或
禁止了抢占时,你不可以调用它们。这同样意味着只有在用户上下文才可以调用:在中断里
调用它们是非法的。
9.1. 一些可能睡眠的函数
下面列出了最常见的几个,但通常来讲,你需要阅读代码来判断其它的函数是否可以
在中断里安全的调用。如果每一个人都是在可睡眠环境下调用某个函数的,那么多半你也要
保证可睡眠的环境(译者案:原文是“If everyone else who calls it can sleep, you
probably need to be able to sleep, too.”) 特别地,注册与注销函数通常需要在用户
上下文中调用,它们可能睡眠。
•
访问用户空间:
copy_from_user()
•
copy_to_user()
•
get_user()
•
put_user()
kmalloc(GFP_KERNEL)
•
•
•
down_interruptible() 和 down()
有一个down_trylock()函数,它可以在中断上下文中调用,不会睡眠。Up()决不会导
致睡眠。
9.2. 一些不会睡眠的函数
有些函数可以在任何上下文中调用,可以持有任何的锁。
•
•
•
printk()
kfree()
add_timer() 和 del_timer()
第10章. 进一步阅读
•
•
Documentation/: 这是Linus Torvalds的内核源代码中的自旋锁教程
Unix Systems for Modern Architectures: Symmetric Multiprocessing and
Caching for Kernel Programmers.
Curt Schimmel的一部对内核级加锁的极好的著作(不是为Linux写的,但几乎都适
用)。书可能贵了些,但对于理解SMP加锁来说,它值这个价钱。[ISBN:]
(译者案:中文译本《现代体系结构上的Unix系统:内核程序员的SMP和Caching技
术》,人民邮电出版社2003年4月出版,定价¥39,共289页,翻译非常不错,是本
值得阅读的好书。)
第11章. 致谢
感谢Telsa Gwynne在DocBook化文档上的帮助,整理与添加风格。
感谢Martin Pool,Phlipp Rumpf,Stephen rothwell,Paul Mackerras,Ruedi
Aschwanden,Alan Cox,Manfred Spraul,Time Waugh,Pete Zaitcev,James
Morris,Robert Love,Paul McKennney,John Ashby,感谢他们的校对、勘误、争论和评
注。
感谢小团体(译者案:原词cabal,原意是阴谋团体,似乎具有反讽意味,讽刺有些
人把Linux内核开发团队称为cabal的FUD行为)没有影响该文档。
术语
(译者案:为了方便理解,本页所有术语条目一律保留其英文)
抢占,preemption
在2.5内核之前,或编译时CONFIG_PREEMPT未打开,内核中用户上下文的进程不会彼
此抢占(亦即:只要你占有CPU,你就一直占有,直到自己放弃或者发生硬件中断)。
在2.5.4以后且打开了CONFIG_PREEMPT,事情改变了:在用户上下文,更高优先级的
任务可以抢占当前任务:自旋锁为此改写,以添加禁止抢占的效果,即使在UP上也是
如此。
bh
下半部:由于历史原因,带有'_bh'后缀的函数现在通常指所有的软中断。例如
spin_lock_bh()将阻止任何软中断在本CPU上的运行。原来的BH是不该继续使用的,
它终将被tasklets取代。在任一给定时刻,一个下半部只可能有一个实例在运行。
(译者注:Linux内核的各种下半部的命名混乱不堪,例如上面这句其实就不太严谨。
tasklet/timer诚然是“任何时刻只可能有一个实例在运行”,但是一个softirq就有
可能同时在多个CPU上运行。推荐阅读Robert Love的
Linux Kernel Development
一
书来理清个中关系)
硬件中断/硬中断,Hardware Interrupt/Hardware IRQ
硬件中断请求。在硬件中断处理函数中,in_irq()宏返回true。
中断上下文,Interrupt Context
非用户上下文:正在处理一个硬件中断或软中断。这种情况下,in_interrupt()宏返
回true。
对称多处理器,SMP
对称多处理器:为多CPU机器编译的内核。(CONFIG_SMP=y).
软中断/softirq,Software Interrupt/softirq
软中断处理程序。in_irq()返回false;in_softirq()返回true。Tasklets和
softirqs都属于软中断。
严格来讲,一个softirq是一个最多能有32个枚举类型的软中断,它可以同时运行在
多个CPU上。有时softirq一词也用来指谓tasklets(也就是说,指谓所有的软中断)。
tasklet
一种可动态注册的软中断,保证了同一时刻只能有一个CPU在运行它。
定时器,timer
一种可动态注册的软中断,它在给定时刻(或接近给定时刻)运行。当运行时,它就象
tasklet一样(事实上它们是作为TIMER_SOFTIRQ被调用的)。
单处理器,UP
单处理器:非SMP。(CONFIG_SMP=n)
用户上下文,User Context
内核代表某个进程(亦即,一次系统调用或陷阱)或内核线程在运行。你可以用current
宏得知究竟是哪个进程。注意不要与“用户空间”一词搞混。它可以被硬件或软中断
打断执行。
用户空间,Userspace
进程在内核之外执行自己的代码。
翻译后记
由于本人水平所限,加上没有SMP环境试验一些理解正确与否,文档误译之处
肯定不少(有些不会翻译的干脆就保留原文了,如您所见)。 无论您有什么观点,请
与albcamus@联系,欢迎批评指正,我会尽量跟踪改进本文档的翻译。如果您
英文可以的话,还是看原版吧,作者的行文并不难懂。
在翻译过程中,alert7前辈为我提供了他的译稿做参考,并指正了很多问题,
在此向他表示感谢!──当然,误译之处只由我自己负责。
趁十一熬夜,05年10月05日凌晨6点


发布评论