訊號量核心原始碼
ofollow,noindex" target="_blank">https://blog.csdn.net/u012603457/article/details/52971894
之前的一片部落格介紹了用於Linux核心同步的自旋鎖,即使用自旋鎖來保護共享資源,今天介紹另外一種Linux核心同步機制——訊號量。訊號量在核心中的使用非常廣泛,用於對各種共享資源的保護。訊號量與自旋鎖的實現機制是不一樣的,用處也是不一樣的。首先,自旋鎖和訊號量都使用了計數器來表示允許同時訪問共享資源的最大程序數,但自旋鎖的共享計數值是1,也就是說任意時刻只有一個程序在共享程式碼區執行;訊號量卻允許使用大於1的共享計數,即共享資源允許被多個不同的程序同時訪問,當然,訊號量的計數器也能設為1,這時訊號量也稱為互斥量。其次,自旋鎖用於保護短時間能夠完成操作的共享資源,使用期間不允許程序睡眠和程序切換;訊號量常用於暫時無法獲取的共享資源,如果獲取失敗則程序進入不可中斷的睡眠狀態,只能由釋放資源的程序來喚醒。最後,自旋鎖可以用於中斷服務程式之中;訊號量不能在中斷服務程式中使用,因為中斷服務程式是不允許程序睡眠的。關於訊號量的基本知識已經講解完畢,接下來看看訊號量在核心裡面的實現,本文講解的核心版本是linux-2.6.24。
1 資料結構
struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; };
訊號量使用的資料結構是struct semaphore,包含三個資料成員:count是共享計數值、sleepers是等待當前訊號量進入睡眠的程序個數、wait是當前訊號量的等待佇列。
2 訊號量使用
使用訊號量之前要進行初始化,其實只是簡單的設定共享計數和等待佇列,睡眠程序數一開始是0。本文重點講解訊號量的使用和實現。訊號量操作的API:
static inline void down(struct semaphore * sem)//獲取訊號量,獲取失敗則進入睡眠狀態 static inline void up(struct semaphore * sem)//釋放訊號量,並喚醒等待佇列中的第一個程序
訊號量的使用方式如下:
down(sem); ...臨界區... up(sem);
核心保證正在訪問臨界區的程序數小於或等於初始化的共享計數值,獲取訊號量失敗的程序將進入不可中斷的睡眠狀態,在訊號量的等待佇列中進行等待。當程序釋放訊號量的時候就會喚醒等待佇列中的第一個程序。
3 訊號量的實現
3.1 down(sem)
首先看函式的定義:
static inline void down(struct semaphore * sem) { might_sleep(); __asm__ __volatile__( "# atomic down operation\n\t" LOCK_PREFIX "decl %0\n\t"/* --sem->count */ "jns 2f\n" "\tlea %0,%%eax\n\t" "call __down_failed\n" "2:" :"+m" (sem->count) : :"memory","ax"); }
這裡麵包含了一些彙編程式碼,%0代表sem->count。也就是說先將sem->count減1,LOCK_PREFIX表示執行這條指令時將匯流排鎖住,保證減1操作是原子的。減1之後如果大於或等於0就轉到標號2處執行,也就跳過了down_failed函式直接到函式尾部並返回,成功獲取訊號量;否則減1之後sem->count小於0則順序執行後面的 down_failed函式。接下來看__down_failed函式的定義:
ENTRY(__down_failed) CFI_STARTPROC FRAME pushl %edx CFI_ADJUST_CFA_OFFSET 4 CFI_REL_OFFSET edx,0 pushl %ecx CFI_ADJUST_CFA_OFFSET 4 CFI_REL_OFFSET ecx,0 call __down popl %ecx CFI_ADJUST_CFA_OFFSET -4 CFI_RESTORE ecx popl %edx CFI_ADJUST_CFA_OFFSET -4 CFI_RESTORE edx ENDFRAME ret CFI_ENDPROC END(__down_failed)
pushl和popl是用於儲存和恢復暫存器的,CFI字首的指令用於指令對齊調整。重點在函式__down,下面來看該函式的定義:
fastcall void __sched __down(struct semaphore * sem) { struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); unsigned long flags; tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irqsave(&sem->wait.lock, flags); add_wait_queue_exclusive_locked(&sem->wait, &wait); sem->sleepers++; for (;;) { int sleepers = sem->sleepers; /* * Add "everybody else" into it. They aren't * playing, because we own the spinlock in * the wait_queue_head. */ if (!atomic_add_negative(sleepers - 1, &sem->count)) { sem->sleepers = 0; break; } sem->sleepers = 1;/* us - see -1 above */ spin_unlock_irqrestore(&sem->wait.lock, flags); schedule(); spin_lock_irqsave(&sem->wait.lock, flags); tsk->state = TASK_UNINTERRUPTIBLE; } remove_wait_queue_locked(&sem->wait, &wait); wake_up_locked(&sem->wait); spin_unlock_irqrestore(&sem->wait.lock, flags); tsk->state = TASK_RUNNING; }
fastcall表示一種快速呼叫方式,函式的前兩個引數由暫存器ecx和edx來傳遞,其餘引數仍使用堆疊傳遞。首先將程序設為不可中斷睡眠狀態,即不能通過訊號來喚醒,只能是核心親自喚醒。同時將程序的TASK_EXCLUSIVE標誌設為1,則wake_up()只會喚醒等待佇列中的第一個程序。然後將睡眠等待數加1,之後進入for迴圈。函式atomic_add_negative(sleepers - 1, &sem->count)將相當於sem->count += sleepers-1,然後返回sem->count,通過該函式進行訊號量獲取情況測試,返回結果為0則獲取資源,小於0則沒有獲取。這段程式碼使用sleepers和sem->count共同表示當前資源的使用情況。進入for迴圈後有兩種情況,一種是atomic_add_negative執行結果為0,即獲取了訊號量,此時將sleepers設為0並退出迴圈,同時喚醒等待佇列的第一個程序進行訊號量獲取測試;另一種是沒有獲取訊號量,將sleepers設為1並執行schedule()進入睡眠,被喚醒之後繼續執行for迴圈進行訊號量獲取測試。
注意,執行完執行一遍for指令後sleepers的值有兩種結果,一種是0,一種是1。如果0則表示有一個程序通過了訊號量獲取的測試,則atomic_add_negative(sleepers - 1, &sem->count)實際上是將sem->count執行了減1操作,這個操作會在下一個程序進行訊號量獲取測試的時候執行。如果是1則表示程序沒有通過訊號領獲取的測試,則atomic_add_negative(sleepers - 1, &sem->count)操作不會影響sem->count的值。也就是說,當程序進入__down時,sleepers只會有兩個值,一個是0,一個是1。0表示之前的程序獲取了訊號量,1表示之前的程序沒有獲取訊號量。如果之前程序獲取了訊號量,執行atomic_add_negative(sleepers - 1, &sem->count)時就會將sem->count的值減1;否則sem->count的值將保持不變。但是這個減1操作延遲到了下一個程序的執行期間,考慮到獲取訊號量之後程序會喚醒等待佇列裡的第一個程序,這個減1操作應該會很快就得到執行。
細心地小夥伴可能會注意到,首次獲取訊號量失敗的程序不是會執行sem->sleepers++操作嗎,這樣不就改變了sem->count的值了嗎?仔細回想獲取訊號量的過程,獲取失敗的時候會執行sem->count–操作的,因此剛好和sem->sleeper++相互呼應,結果就是不會改變sem->count的結果。即只有程序獲取訊號量後才會對sem->count進行減1操作,這個操作並不是馬上執行,而是後續程序進行訊號量獲取檢測的時候進行的
3.2 up(sem)
先看函式定義:
static inline void up(struct semaphore * sem) { __asm__ __volatile__( "# atomic up operation\n\t" LOCK_PREFIX "incl %0\n\t"/* ++sem->count */ "jg 1f\n\t" "lea %0,%%eax\n\t" "call __up_wakeup\n" "1:" :"+m" (sem->count) : :"memory","ax"); }
首先將sem->count加1,是原子操作,如果加1後sem->count大於0則說明沒有程序在等待訊號量資源,無須喚醒佇列中程序,直接跳轉到標號1處返回;否則執行__up_wakeup喚醒等待佇列中的程序。
ENTRY(__up_wakeup) CFI_STARTPROC FRAME pushl %edx CFI_ADJUST_CFA_OFFSET 4 CFI_REL_OFFSET edx,0 pushl %ecx CFI_ADJUST_CFA_OFFSET 4 CFI_REL_OFFSET ecx,0 call __up popl %ecx CFI_ADJUST_CFA_OFFSET -4 CFI_RESTORE ecx popl %edx CFI_ADJUST_CFA_OFFSET -4 CFI_RESTORE edx ENDFRAME ret CFI_ENDPROC END(__up_wakeup)
同樣,我們只關注函式__up
的定義:
fastcall void __up(struct semaphore *sem) { wake_up(&sem->wait); }
可以看到,__up的的工作就是喚醒等待佇列中的所有程序,但是由於sem等待佇列中的程序 的TASK_EXCLUSIVE標誌為 1,因此不會喚醒後續程序了。也就是說up(sem)操作實際上是將sem->count自增1,然後喚醒等待佇列中的第一個程序(如果有的話)。 4 小結 訊號量作為一種基礎的核心同步機制,使用非常廣泛。本文基於linux-2.6.24核心版本介紹了訊號量使用的資料結構和實現機制,同時介紹了訊號量與自旋鎖的區別。