深入剖析 Java 7 中的 HashMap 和 ConcurrentHashMap
本文將深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的原始碼,解析 HashMap 執行緒不安全的原理以及解決方案,最後以測試用例加以驗證。
1 Java7 HashMap
HashMap 的資料結構:
從上圖中可以看出,HashMap 底層就是一個數組結構,陣列中的每一項又是一個連結串列。
通過檢視 JDK 中的 HashMap 原始碼,可以看到其建構函式有一行程式碼:
public HashMap(int initialCapacity, float loadFactor) { ... table = new Entry[capacity]; ... }
即建立了一個大小為 capacity 的 Entry 陣列,而 Entry 的結構如下:
static class Entry<K,V> implements Map.Entry<K,V> { final K key; V value; Entry<K,V> next; final int hash; …… }
可以看到,Entry 是一個 static class,其中包含了 key 和 value ,也就是鍵值對,另外還包含了一個 next 的 Entry 指標。
- capacity:當前陣列容量,始終保持 2^n,可以擴容,擴容後陣列大小為當前的 2 倍。預設初始容量為 16。
- loadFactor:負載因子,預設為 0.75。
- threshold:擴容的閾值,等於 capacity * loadFactor
1.1 put過程分析
public V put(K key, V value) { // 當插入第一個元素的時候,需要先初始化陣列大小 if (table == EMPTY_TABLE) { inflateTable(threshold); } // 如果 key 為 null,則這個 entry 放到 table[0] 中 if (key == null) return putForNullKey(value); // key 的 hash 值 int hash = hash(key); // 找到對應的陣列下標 int i = indexFor(hash, table.length); // 遍歷一下對應下標處的連結串列,看是否有重複的 key 已經存在, // 如果有,直接覆蓋,put 方法返回舊值就結束了 for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; // 不存在重複的 key,將此 entry 新增到連結串列中 addEntry(hash, key, value, i); return null; }
這裡對一些方法做深入解析。
- 陣列初始化
private void inflateTable(int toSize) { // 保證陣列大小一定是 2^n int capacity = roundUpToPowerOf2(toSize); // 計算擴容閾值 threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); // 初始化陣列 table = new Entry[capacity]; initHashSeedAsNeeded(capacity); }
- 找到對應的陣列下標
static int indexFor(int hash, int length) { // 作用等價於取模運算,但這種方式效率更高 return hash & (length-1); }
因為HashMap的底層陣列長度總是 2^n,當 length 為 2 的 n 次方時,hash & (length-1) 就相當於對length取模,而且速度比直接取模要快的多。
- 新增節點到連結串列中
void addEntry(int hash, K key, V value, int bucketIndex) { // 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的陣列位置已經有元素了,那麼要擴容 if ((size >= threshold) && (null != table[bucketIndex])) { // 擴容 resize(2 * table.length); // 重新計算 hash 值 hash = (null != key) ? hash(key) : 0; // 計算擴容後的新的下標 bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } // 永遠都是在連結串列的表頭新增新元素 void createEntry(int hash, K key, V value, int bucketIndex) { // 獲取指定 bucketIndex 索引處的 Entry Entry<K,V> e = table[bucketIndex]; // 將新建立的 Entry 放入 bucketIndex 索引處,並讓新的 Entry 指向原來的 Entry table[bucketIndex] = new Entry<>(hash, key, value, e); size++; }
當系統決定儲存 HashMap 中的 key-value 對時,完全沒有考慮 Entry 中的 value,僅僅只是 根據 key 來計算並決定每個 Entry 的儲存位置 。我們完全可以把 Map 集合中的 value 當成 key 的附屬,當系統決定了 key 的儲存位置之後,value 隨之儲存在那裡即可。
- 陣列擴容
隨著 HashMap 中元素的數量越來越多,發生碰撞的概率將越來越大,所產生的子鏈長度就會越來越長,這樣勢必會影響 HashMap 的存取速度。為了保證 HashMap 的效率,系統必須要在某個臨界點進行擴容處理,該臨界點 threshold。而在 HashMap 陣列擴容之後,最消耗效能的點就出現了:原陣列中的資料必須重新計算其在新陣列中的位置,並放進去,這就是 resize。
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; // 若 oldCapacity 已達到最大值,直接將 threshold 設為 Integer.MAX_VALUE if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; // 直接返回 } // 否則,建立一個更大的陣列 Entry[] newTable = new Entry[newCapacity]; //將每條Entry重新雜湊到新的陣列中 transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; // 重新設定 threshold threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); }
1.2 get過程分析
public V get(Object key) { // key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的連結串列就可以了 if (key == null) return getForNullKey(); // key 非 null 的情況,詳見下文 Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); } final Entry<K,V> getEntry(Object key) { // The number of key-value mappings contained in this map. if (size == 0) { return null; } // 根據該 key 的 hashCode 值計算它的 hash 碼 int hash = (key == null) ? 0 : hash(key); // 確定陣列下標,然後從頭開始遍歷連結串列,直到找到為止 for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; //若搜尋的key與查詢的key相同,則返回相對應的value if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; }
2 Java7 ConcurrentHashMap
ConcurrentHashMap 的成員變數中,包含了一個 Segment 陣列 final Segment<K,V>[] segments;,而 Segment 是ConcurrentHashMap 的內部類。
然後在 Segment 這個類中,包含了一個 HashEntry 的陣列transient volatile HashEntry<K,V>[] table,而 HashEntry 也是 ConcurrentHashMap 的內部類。
HashEntry 中,包含了 key 和 value 以及 next 指標(類似於 HashMap 中的 Entry),所以 HashEntry 可以構成一個連結串列。
2.1 成員變數及建構函式
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { ... //初始的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //初始的載入因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //初始的併發等級,表示當前更新執行緒的估計數 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //最小的segment數量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment數量 static final int MAX_SEGMENTS = 1 << 16; // static final int RETRIES_BEFORE_LOCK = 2; // segments 的掩碼值, key 的雜湊碼的高位用來選擇具體的 segment final int segmentMask; // 偏移量 final int segmentShift; final Segment<K,V>[] segments; ... // 建立一個帶有指定初始容量、載入因子和併發級別的新的空對映 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 尋找最佳匹配引數(不小於給定引數的最接近的 2^n) int sshift = 0; // 用來記錄向左按位移動的次數 int ssize = 1; // 用來記錄Segment陣列的大小 // 計算並行級別 ssize,因為要保持並行級別是 2^n while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 若為預設值,concurrencyLevel 為 16,sshift 為 4 // 那麼計算出 segmentShift 為 28,segmentMask 為 15 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 記錄每個 Segment 上要放置多少個元素 int c = initialCapacity / ssize; // 假如有餘數,則Segment數量加1 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
當用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:
- Segment 陣列長度為 16,不可以擴容
- Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
- 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
- 當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到
2.2 put過程分析
根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根據 hash 值找到 Segment 陣列中的位置 j // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位, // 然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的陣列下標 int j = (hash >>> segmentShift) & segmentMask; // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null, // ensureSegment(j) 對 segment[j] 進行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject// nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) //in ensureSegment s = ensureSegment(j); // 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
Segment 內部是由 陣列+連結串列 組成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 先獲取該 segment 的獨佔鎖 // 每一個Segment進行put時,都會加鎖 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // segment 內部的陣列 HashEntry<K,V>[] tab = table; // 利用 hash 值,求應該放置的陣列下標 int index = (tab.length - 1) & hash; // 陣列該位置處的連結串列的表頭 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { // 如果鏈頭不為 null if (e != null) { K k; //如果在該鏈中找到相同的key,則用新值替換舊值,並退出迴圈 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //如果沒有和key相同的,一直遍歷到鏈尾,鏈尾的next為null,進入到else e = e.next; } else { // node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。 // 如果不為 null,那就直接將它設定為連結串列表頭;如果是null,初始化並設定為連結串列表頭。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 如果超過了該 segment 的閾值,這個 segment 需要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // 沒有達到閾值,將 node 放到陣列 tab 的 index 位置, // 其實就是將新的節點設定成原連結串列的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解鎖 unlock(); } return oldValue; }
2.3 初始化Segment
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。
這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 這裡看到為什麼之前要初始化 segment[0] 了, // 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k] // 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 內部的陣列 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment[k] 是否被其它執行緒初始化了 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
2.4 get過程分析
比較簡單,先找到 Segment 陣列的位置,然後找到 HashEntry 陣列的位置,最後順著連結串列查詢即可。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
3 執行緒不安全
3.1 雜湊碰撞
多個執行緒同時使用 put() 方法新增元素,若存在兩個或多個 put() 的 key 發生了碰撞,那麼有可能其中一個執行緒的資料被覆蓋。
3.2 擴容
當資料要插入 HashMap 時,都會檢查容量有沒有超過設定的 thredhold,如果超過,則需要擴容。而多執行緒會導致擴容後的連結串列形成環形資料結構,一旦形成環形資料結構,Entry 的 next 的節點永遠不為 null,就會在獲取 Entry 時產生死迴圈。
例子可見文章《HashMap多執行緒死迴圈問題》。
不過要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 新增節點到連結串列中是先擴容後再新增,而例子中的原始碼是:
void addEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; // 先新增節點 table[bucketIndex] = new Entry<K,V>(hash, key, value, e); // 然後擴容 if (size++ >= threshold) resize(2 * table.length); }
所以在 Java7 中此例子無效。而在 Java8 中,通過確保建新鏈與舊鏈的順序是相同的,即可避免產生死迴圈。
4 HashMap遍歷方式
import java.util.HashMap; import java.util.Iterator; import java.util.Map; public class HashMapTest { private final static Map<Integer, Object> map = new HashMap<Integer, Object>(10000); private static final Object PRESENT = new Object(); public static void main(String args[]) { long startTime; long endTime; long totalTime; for (int i = 0; i < 7500; i++) { map.put(i, PRESENT); } // 方法一 startTime = System.nanoTime(); Iterator iter1 = map.entrySet().iterator(); while (iter1.hasNext()) { Map.Entry<Integer, Object> entry = (Map.Entry) iter1.next(); Integer key = entry.getKey(); Object val = entry.getValue(); } endTime = System.nanoTime(); totalTime = endTime - startTime; System.out.println("methor1 pays " + totalTime + " ms"); // 方法二 startTime = System.nanoTime(); Iterator iter2 = map.keySet().iterator(); while (iter2.hasNext()) { Object key = iter2.next(); Object val = map.get(key); } endTime = System.nanoTime(); totalTime = endTime - startTime; System.out.println("methor2 pays " + totalTime + " ms"); } }
執行結果:
5 效能對比
執行緒安全的使用 HashMap 有三種方式,分別為 Hashtable、SynchronizedMap()、ConcurrentHashMap。
Hashtable
使用 synchronized 來保證執行緒安全,幾乎所有的 public 的方法都是 synchronized 的,而有些方法也是在內部通過 synchronized 程式碼塊來實現。
synchronizedMap()
通過建立一個執行緒安全的 Map 物件,並把它作為一個封裝的物件來返回。
ConcurrentHashMap
支援多執行緒對 Map 做讀操作,並且不需要任何的 blocking 。這得益於 CHM 將 Map 分割成了不同的部分,在執行更新操作時只鎖住一部分。根據預設的併發級別, Map 被分割成 16 個部分,並且由不同的鎖控制。這意味著,同時最多可以有 16個 寫執行緒操作 Map 。試想一下,由只能一個執行緒進入變成同時可由 16 個寫執行緒同時進入(讀執行緒幾乎不受限制),效能的提升是顯而易見的。但由於一些更新操作,如 put(), remove(), putAll(), clear()只鎖住操作的部分,所以在檢索操作不能保證返回的是最新的結果。
在迭代遍歷 CHM 時, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不會返回某些最近的改變,並且在遍歷過程中,如果已經遍歷的陣列上的內容變化了,不會丟擲 ConcurrentModificationExceptoin 的異常。
什麼時候使用 ConcurrentHashMap ?
CHM 適用於讀者數量超過寫者時,當寫者數量大於等於讀者時,CHM 的效能是低於 Hashtable 和 synchronizedMap 的。這是因為當鎖住了整個 Map 時,讀操作要等待對同一部分執行寫操作的執行緒結束。
CHM 適用於做 cache ,在程式啟動時初始化,之後可以被多個請求執行緒訪問。
CHM 是Hashtable一個很好的替代,但要記住, CHM 的比 Hashrable 的同步性稍弱。
6 拓展:Java8 HashMap & ConcurrentHashMap
Java8 對 HashMap 和 ConcurrentHashMap 做了一些修改:
- 二者均利用了紅黑樹,所以其資料結構由 陣列 + 連結串列 + 紅黑樹 組成。我們知道,連結串列上的資料需要一個一個比較下去才能找到我們需要的,時間複雜度取決於連結串列的長度,為 O(n)。為了降低這一部分的開銷,在 Java8 中,當連結串列中的元素超過了 8 個以後,會將連結串列轉換為紅黑樹,這個時候時間複雜度就降為了 O(logN)
- Java8 中 ConcurrentHashMap 摒棄 Java7 中的 Segment 的概念,使用了另一種方式實現保證執行緒安全。
Linux公社的RSS地址: ofollow,noindex" target="_blank">https://www.linuxidc.com/rssFeed.aspx
本文永久更新連結地址: https://www.linuxidc.com/Linux/2018-09/154133.htm