Go36-29,30-原子操作
原子操作
對於一個Go程式來說,GO語言執行時系統中的排程器會恰當的安排其中所有的goroutine的執行。不過,在同一時刻,只會有少數的goroutine真正處於執行狀態。為了公平起見,排程器會頻繁的切換這些goroutine。這個中斷的時機有很多,任何兩個語句執行的間隙,甚至是在某條語句執行的過程中都是可能的。即使這些語句在臨界區之內也是一樣的,互斥鎖雖然能保護臨界區中的程式碼序列執行,但是不能保證這些程式碼的原子性 (atomicity)。
原子操作的特點
真正能夠保證原子性執行的工具只有原子操作 (atomic operation)。
原子操作在進行的過程中是不允許中斷的。在底層,這會由CPU提供晶片級別的支援,所以絕對有效。
原子操作可以完全的消除競態條件,並能夠絕對的保證併發安全。並且它的執行速度比其他的同步工具快得多,通常會高出好幾個數量級。
缺點正是因為原子操作不能被中斷,所要它需要足夠簡單,並且要求快速。因此,作業系統層面值對針對二進位制位或整數的原子操作提供支援。
Go語言的原子操作是基於CPU和作業系統的,所以它也只針對少數資料型別的值提供了原子操作函式。這些函式都在標準庫的sync/atomic中。
sync/atomic 包
sync/atomic 包中可以做的原子操作有:
- 加法(add)
- 比較並交換(compare and swap),簡稱CAS
- 載入(load)
- 儲存(store)
- 交換(swap)
這些函式針對的資料型別並不多。但是,對這些型別中的每一個,sync/stomic包都會有一套函式給予支援。這些資料型別有:
- int32
- int64
- uint32
- uint64
- uintptr
- unsafe.Pointer,這個型別在包裡沒有提供原子加法操作的函式
- atomic.Value,包裡還提供了這個型別,可以被用來儲存任意型別的值
函式需要傳入被操作值的指標
原子操作函式的第一個引數值都應該是那個被操作的值,並且是傳入指標,比如:*int32。
原子操作函式需要的是被操作值的指標,而不是值本身。即使是unsafe.Pointer型別雖然本身已經是指標型別,但是原子操作函式裡還要要這個值的指標。
只要原子操作函式拿到了被操作值的指標,就可以定位到儲存該值的記憶體地址。只有這樣,才能夠通過底層的指令,操作這個記憶體地址上的資料。
加法操作也可以用來做減法
包裡只提供了加法操作的函式,沒有減法操作的函式,不過是可以實現減法的。比如:atomic.AddInt32。函式的第二個引數代表的差量,它的型別是int32,這個型別是有符號的。這裡可以傳個負整數就是做減法了。
對於atomic.AddInt64也是型別的。不過對於atomic.AddUint32和atomic.AddUint64要做原子減法就不能這麼直接了,因為第二引數的值是uint32和uint64,這些型別是無符號的。比如要減3,差量就是-3,要先把差量轉換為有符號的型別比如int32,然後再把該值轉換為uint32,用表示式描述就是:
uint32(int32(-3))
不過上面這樣寫,會使編譯器報錯,因為這麼做其實會讓表示式的結果值溢位。不過可以先把int32(-3)先賦值給一個臨時變數比如名字就叫delta,來繞過編譯器的檢查。
上面的那種方式比較好理解,另外還有一種方式第二的引數用下面的表示式:
^uint32(-(-3)-1)
上面這麼做的原理,簡單來說就是取補碼,具體就要去了解一下計算機中的原碼、補碼、反碼,以及是如何實現減法的了。
上面兩中方式是等價的,下面是例項程式碼:
package main import ( "fmt" "sync/atomic" ) func main() { num := uint32(18) delta := int32(-3) atomic.AddUint32(&num, uint32(delta)) fmt.Println(num) atomic.AddUint32(&num, ^uint32(-(-3)-1)) fmt.Println(num) }
比較並交換
比較並交換操作即CAS操作,是有條件的交換操作,只有在條件滿足的情況下才會進行值的交換。
交換,指的是把新值賦值給變數,並返回變數的舊值。
在進行CAS操作的時候,函式會先判斷被操作變數的值,是否與預期的舊值相等。如果相等,就把新值賦給該變數,並返回true以表明交換操作已經進行。否則就忽略交換操作,並返回false。CAS操作並不是一個單一的操作,而是一種操作組合。這與其他原子操作都不同。正式因為如此,它的用途要更廣泛一些。比如,將它與for語句聯用,就可以實現一種簡易的自旋鎖 (spinlock)。
自旋鎖自旋鎖(spinlock):是指當一個執行緒在獲取鎖的時候,如果鎖已經被其它執行緒獲取,那麼該執行緒將迴圈等待,然後不斷的判斷鎖是否能夠被成功獲取,直到獲取到鎖才會退出迴圈。
for { if atomic.CompareAndSwapInt32(&num, 10, 0) { fmt.Println("檢查到num為10,清0") break } time.Sleep(time.Millisecond * 300) }
下面是程式碼完整的展示,實現了建議的自旋鎖:
package main import ( "fmt" "sync/atomic" "time" ) func main() { done := make(chan struct{}) var num int32 // 定時增加num的值 go func() { defer func() { done <- struct{}{} }() for { time.Sleep(time.Millisecond * 500) newNum := atomic.AddInt32(&num, 1) fmt.Println("num:", newNum) if newNum > 10 { break } } }() // 定時檢查num的值,如果等於10,就設定為0 go func() { defer func() { done <- struct{}{} }() for { if atomic.CompareAndSwapInt32(&num, 10, 0) { fmt.Println("檢查到num為10,清0") break } time.Sleep(time.Millisecond * 500) } }() <- done <- done fmt.Println("Over") }
在for語句中的CAS操作,可以不停的檢查某個需要滿足的條件,一旦條件滿足就退出for迴圈。這相當於只要條件不滿足,當前流程就會被一直阻塞 在這裡。
這個在效果上與互斥鎖型別,不過適用場景不同。互斥鎖總是假設共享資源的狀態會被其他的goroutine頻繁的改變,是一種悲觀鎖 。而這裡是假設共享資源狀態的改變並不頻繁,所以你的操作一般都是如期望的那樣成功,是一種更加樂觀,更加寬鬆的做法,就是樂觀鎖 。
下面的例子,啟用了多個goroutine都要對num的值做加法操作。
package main import ( "time" "sync/atomic" "fmt" ) var done chan struct{} = make(chan struct{}) var num int32 func CompareAndAdd(id, times int, increment int32) { defer func() { done <- struct{}{} }() for i := 0; i < times; i++ { for { currNum := atomic.LoadInt32(&num)// 先獲取當前的值 newNum := currNum + increment// 這裡希望對num做加法 // 假設是一個耗時的操作,就是有可能在這段時間裡有別的goroutine已經修改了num time.Sleep(time.Millisecond * 300) // 比較現在num的值和操作前獲取到的值是否一致,如果一致,表示表裡沒有別修改過,可以更新為新值 // 如果不一致,表示在這段時間裡num已經被別的goroutine修改過了,必須重新來過 if atomic.CompareAndSwapInt32(&num, currNum, newNum) { fmt.Printf("更新num[%d-%d]: +%d = %d\n", id, i, increment, newNum) break } else { fmt.Printf("更新num失敗[%d-%d],重試...\n", id, i) } } } } func main() { go CompareAndAdd(1, 6, 2) go CompareAndAdd(2, 4, 3) go CompareAndAdd(3, 3, 4) <- done <- done <- done fmt.Println("Over") }
這裡在把加法後的新值賦值給原來的變數num前,先檢查此時num的值是否發生過變化了,如果沒有發生變化,就可以將num設定為新值。否則就從頭在做一次加法執行、檢查、賦值,直到成功為止。在假設共享資源狀態的改變並不頻繁的前提下,這種實現是比悲觀鎖更好的。
讀寫操作都要實現原子操作
在已經保證了對一個變數的寫操作都是原子操作,比如:加法、儲存、交換等等。在對它進行讀操作的時候,依然有必要使用原子操作。
參考讀寫鎖,寫操作和讀操作之間是互斥的,這是為了防止讀操作讀到還沒有被修改完的值。如果讀操作讀到一半就被中斷了,等再回來繼續讀取的時候,就讀到了修改前後兩部分的內容。這顯然破壞了值的完整性。所以,一旦決定對一個共享資源進行保護,就要做到完全的保護。
適用場景
由於原子操作函式只支援非常有限的資料型別,所以在很多應用場景下,互斥鎖更加合適。不過如果當前場景下可以使用原子操作,就不要考慮互斥鎖了。
因為原子操作函式的執行速度要比互斥鎖快的多。而且,使用起來也更加簡單,不會涉及臨界區的選擇,以及死鎖等問題。就是原子操作更加高效,而互斥鎖適用場景更廣,優先考慮是否可以使用原子操作。
原子變數
為了擴大原子操作的適用範圍,Go語言在1.4版本之後,在sync/atomic包中添加了一個新型別Value。此型別的值相當於一個容器,可以被用來原子的儲存和載入任意的值。atomic.Value型別是開箱即用的,宣告一個該型別的變數之後就可以直接使用了,可以稱它為原子變數 。而原子變數的值,可以稱為原子值 。
這個型別使用起來很簡單,只有兩個指標方法:Store()和Load(),不過還是有一些需要注意的地方。
原子值的複製
一旦atomic.Value型別的值,就是原子值被真正使用,它就不應該再被複制了。
只要用它來儲存值了,就相當於開始真正使用了。atomic.Value型別屬於結構體型別,而結構體型別屬於值型別。
所以,複製該型別的值會產生一個完全分離的新值。這個新值相當於被複制的那個值的一個快照。之後,不論後者儲存的值怎樣改變,都不會影響到前者的使用,反之亦然。
這個是進行驗證的示例程式碼:
func main() { var box atomic.Value box2 := box// 原子值真正使用之前可以被複制 v1 := [...]int{1,2,3} box.Store(v1)// 對box1的改變,不會影響到box2 fmt.Println(box.Load()) fmt.Println(box2.Load()) }
上面我把原話都引用過來了,下面是我的理解。上面的box2 := box
這句,編譯器是有綠色的提示的,不影響執行但是應該要引起我們注意。然後在原始碼裡也找到了一些建議:
// A Value provides an atomic load and store of a consistently typed value. // The zero value for a Value returns nil from Load. // Once Store has been called, a Value must not be copied. // // A Value must not be copied after first use. type Value struct { noCopy noCopy v interface{} } // noCopy may be embedded into structs which must not be copied // after the first use. // // See https://github.com/golang/go/issues/8005#issuecomment-190753527 // for details. type noCopy struct{}
看著意思也就是上面說的,使用之後就不要再複製了。
不過既然這個型別是開箱即用的,那麼只要再宣告一個變數使用就好了,沒有必要複製一個來使用。另外就是因為這是一個值型別,所以賦值的是副本,對原值的改變不會影響到副本。如果需要,就用指標。
上面示例中複製的用法很傻,應該不會也想不到要這麼用。仔細想想,真正需要複製該值的情況可能是作為函式的引數,就是先宣告一個開箱即用的原子值,然後在不同的函式裡都把這個原子值作為引數。這裡是可以的,而且也很方便。只要知道每個函式裡都是不同的原子值就行了,就是值型別傳參要注意的那些問題。下面是我想的一個場景:
package main import ( "fmt" "sync/atomic" ) func loadBox(box atomic.Value, v interface{}) { box.Store(v) fmt.Println(box.Load()) } func main() { var box atomic.Value// 下面呼叫了3次函式,就是複製了box3次 v1 := [...]int{1,2,3} loadBox(box, v1) v2 := "Hello" loadBox(box, v2) v3 := 123 loadBox(box, v3) }
原子值儲值的規則
用原子值來儲值有兩條強制性的規則:
- 不能用原子值儲存nil
- 向原子值儲存額第一個值,決定了它今後能且只能儲存哪一個型別的值
就是不能把nil作為引數傳入原子值的Store方法,否則就會引發panic。
這裡還有注意介面型別的變數,它的動態值是nil,但是動態型別卻不是nil,所以它的值就不等於nil。這樣的一個變數的值是可以被存入原子值的。
就是不能存nil,box.Store(nil)
這樣是要panic的。只要是有型別的,值是nil也是可以的,下面這樣用是沒問題的:
func main() { var box atomic.Value var v1 chan struct{} fmt.Println(v1) box.Store(v1) fmt.Println(box.Load()) }
上面提到了介面,其實Stroe方法接收的引數就是空介面:
func (v *Value) Store(x interface{}) { // 省略函式內容 }
接著上面的說,Store接收的引數是一個空介面,並且還說了不能是nil。所以只要是不是nil都可以作為引數。這只是作為第一次使用的情況。
一旦向原子值儲存了第一個值,就決定了型別,之後再要儲存,就必須還是同樣的型別了。這個規則,就是通過介面也是繞不開的。原子值內部是依據被儲存值的實際型別來做判斷的。
這裡還有個問題,我們是無法通過某個方法獲知一個原子值是否已經被真正使用。並且,也沒有辦法通過常規的途徑得到一個原子值可以儲存值的實際型別。這使得誤用原子值的可能性大大增加,尤其是在多個地方使用同一個原子值的時候。
通過下面的示例,可以理解一下:
func main() { var box atomic.Value box.Store("")// 存入字串 box2 := box// 在真正使用之後,就不應該被複制 // box2.Store(1)// 存字串以外的型別就會引發panic box2.Store("1") _ = box2 }
使用建議
一、不要把內部使用的原子值暴露給外界。比如,宣告一個全域性的原子變數並不是一個正確的做法。這個變數的訪問許可權最起碼也應該是包級私有的。
二、如果不得不讓包外,或者模組外的程式碼使用你的原子值,那麼可以宣告一個包級私有的原子變數,然後再通過一個或多個公開的函式,讓外界間接的使用到它。注意,這種情況下不要把原子值傳遞到外界,不論是傳遞原子值本身還是它的指標。
三、如果通過某個函式可以向內部的原子值儲存值的話,那麼就應該在這個函式中先判斷被儲存值型別的合法性。若不合法,則應該直接返回對應的錯誤值,從而避免panic的發生。
四、如果可能的話,我們可以把原子值封裝到一個數據型別中,比如一個結構體。這樣我們既可以通過該型別的方法更加安全地儲存值,又可以在該型別中包含可村儲值的合法型別資訊。
概括一下,上面說的就是一個最佳實踐,用一個結構體來封裝。並且解決了前面提到的沒有辦法獲取到原子值儲存的實際型別的問題:
package main import ( "reflect" "os" "fmt" "sync/atomic" ) // 建立結構體,封裝atomic.Value和村儲值的合法型別 // 欄位都是私有的,下面提供了4個可匯出的方法 type atomicValue struct { v atomic.Value t reflect.Type } // 提供方法,返回儲存值的合法型別 func (av *atomicValue) TypeOfValue() reflect.Type { return av.t } // 提供方法,儲存值。儲存之前先檢查型別 func (av *atomicValue) Store(v interface{}) error { if v == nil { return fmt.Errorf("不能儲存nil") } t := reflect.TypeOf(v) if t != av.t { return fmt.Errorf("型別不正確, 需要: %s, 實際: %s", av.t, t) } av.v.Store(v) return nil } // 提供方法,獲取值,雖然示例中沒有用到 func (av *atomicValue) Load() interface{} { return av.v.Load() } // 建立結構體的方法,相當於構造方法 func NewAtomicValue(x interface{}) (*atomicValue, error) { if x == nil { return nil, fmt.Errorf("不能儲存nil") } return &atomicValue{ t: reflect.TypeOf(x),// 獲取變數的型別,返回reflect.Type型別 }, nil } func main() { v := fmt.Errorf("隨便的錯誤") box, err := NewAtomicValue(v) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s\n", err) } fmt.Printf("合法的型別是: %s\n", box.TypeOfValue()) v2 := fmt.Errorf("還是一個錯誤型別") err = box.Store(v2) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s\n", err) } fmt.Printf("儲存了一個值,型別是: %T\n", v2) fmt.Println("嘗試儲存一個其他型別的值") err = box.Store(1) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s\n", err) } }
儲存引用型別
這裡還要特別強調一點:儘量不要向原子值中儲存引用型別的值。因為這很容易造成安全漏洞。儘量不要的意思就是要存還是可以存的,下面的示例中也給出了建議的方法:
package main import ( "fmt" "sync/atomic" ) func main() { var box atomic.Value v := []int{1,2,3}// 切片是引用型別 box.Store(v) v[1] = 4// 此處的操作不是併發安全的! fmt.Println(box.Load())// 儲存的值被改變了 // 正確的做法如下: // 下面這個函式就是把引用型別複製一份出來,然後儲存起來 // 類似於把一個值型別傳遞給函式的效果 store := func(v []int) { replica := make([]int, len(v)) copy(replica, v) box.Store(replica) } store(v) v[2] = 5// 再試著改變切面的值 fmt.Println(box.Load())// 儲存的是副本的值,不會被上面的改變影響 }
這裡把一個切片型別儲存了原子值。切片型別屬於引用型別,所以在外面依然可以改變切片的值。這相當於繞過了原子值而進行了非併發安全的操作。
這裡應該先為切片建立一個完全的副本,然後再把副本儲存box。如此一來,在對原來的切片做修改都不會破壞box提供的安全保護。
總結
原子操作明顯比互斥鎖要更加輕便,但是限制也很明顯。所以如果可以使用原子操作的話,一定是用原子操作更好。
現在有了原子值,突破了一些原子操作的限制。在原子值與互斥鎖之間選擇的時候,就需要仔細考慮了。這篇裡講了很多使用原子值時候的注意事項,可能用的時候就不如互斥鎖這麼好用了。
另外在CAS中還會遇到一個ABA問題,而原子型別應該就會有這個ABA問題。此時就要用互斥鎖了,除非業務對ABA問題不敏感