CAS 的底層實現
java.util.concurrent 包的很多類(如 Semaphore,ConcurrentLinkedQueue)都提供了比 sychronized 機制更高的效能和可伸縮性,源於JDK 1.5提供的原子變數(如AtomicInteger,AtomicReference),這些原子變數類可以構建高效的非阻塞演算法,底層實現是CAS。
CAS(compare and swap)是一種高效實現執行緒安全性的方法,支援原子更新操作,適用於實現計數器,序列發生器等場景,比如線上程池新增worker執行緒的時候,需要增加計數,因為i++並非一個原子操作,所以可以使用 AtomicInteger
實現安全加1的操作。
// java.util.concurrent.ThreadPoolExecutor /** * Attempts to CAS-increment the workerCount field of ctl. */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }
CAS和傳統的加鎖方式(sychronized, ReentrantLock等)相比,CAS是一種樂觀方式(對比資料庫的悲觀、樂觀鎖),無鎖(lock-free),爭用失敗的執行緒不會被阻塞掛起,CAS失敗時由我們決定是繼續嘗試,還是執行其他操作。當然這裡的無鎖只是上層我們感知的無鎖,其實底層仍然是有加鎖行為的,後面會看到。
此外,CAS存在ABA問題,可以看下 AtomicStampedReference
,內部封裝的是[reference, integer]。
接下來跟蹤下原始碼。
CAS底層實現
從 AtomicInteger 入手,其中的屬性 valueOffset 是該物件的 value 在記憶體中的起始地址。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
JDK中CAS的API都封裝在 sun.misc.Unsafe
這個類中。接下來進入 openjdk 中對應的方法,位置 hotspot\src\share\vm\prims\unsafe.cpp
。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); // 從偏移 obj 得到int的地址 addr oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END
然後繼續 hotspot\src\share\vm\runtime\atomic.cpp
。
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) { assert(sizeof(jbyte) == 1, "assumption."); uintptr_t dest_addr = (uintptr_t)dest; uintptr_t offset = dest_addr % sizeof(jint); volatile jint* dest_int = (volatile jint*)(dest_addr - offset); jint cur = *dest_int; jbyte* cur_as_bytes = (jbyte*)(&cur); jint new_val = cur; jbyte* new_val_as_bytes = (jbyte*)(&new_val); new_val_as_bytes[offset] = exchange_value; // 直到更新成功 while (cur_as_bytes[offset] == compare_value) { // 進入 jint res = cmpxchg(new_val, dest_int, cur); if (res == cur) break; cur = res; new_val = cur; new_val_as_bytes[offset] = exchange_value; } return cur_as_bytes[offset]; }
如果是Linux x86架構的話,底層實現是 hotspot\src\os_cpu\linux_x86\vm\atomic_linux_x86.inline.hpp
:
// Adding a lock prefix to an instruction on MP machine #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: " inline jintAtomic::cmpxchg(jintexchange_value, volatile jint*dest, jintcompare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
x86中對應的機器指令是 CMPXCHG ,如果是多處理器架構的話會有 LOCK 字首。
參考
- 《Java併發程式設計實戰》
- ofollow,noindex">compare and swap
- x86 LOCK 指令