本文是建立在上篇《HashMap源碼分析》的基礎上。其中的一些重複的方法和知識點不會再贅述。有疑惑的同學可以移步到上一篇文章。依舊以jdk1.8源碼為基礎來講解ConcurrentHashMap。它的大體結構與HashMap相同,table容量同樣要求是2的冪次。
HashMap高效快捷,但不安全,特別是2020年,安全很重要。目前市面上有3種提供安全的map的方式,分別是
- hashtable:相對古老的線程安全機制。任一時間只有一個線程能寫操作。現在基本被效率更高的ConcurrentHashMap替代。
- synchronizedMap:Collections中的內部類,可以把普通的map轉成線程安全的map。原理就是在操作對象上加synchronized。
- ConcurrentHashMap:線程安全的HashMap。
如何做到線程安全
多線程並髮帶來的問題目前總體有2種解決機制。
- 悲觀機制:認為最壞的結果肯定發生,從開始就設置一定規則最大限度減少發生機率,有點以防萬一、未雨綢繆的意思。比如安全套,肯定不會每次都中,可萬一呢?並發的悲觀實現一般採用鎖,java中的synchronized關鍵字也被廣泛應用在多線程並發的場景中。但是鎖會降低程序性能。
- 樂觀機制:認為結果總是好的,先幹了再說,不行再想辦法:重試,補救,版本號控制等。意外懷孕的痴男怨女們可能就太樂觀了,事後只能採取補救措施。CAS就是樂觀機制。
ConcurrentHashMap中主要採用的CAS+自旋,改成功就改,改不成功繼續試(自旋)。也有synchronized配合使用。
CAS & Unsafe
CAS的全稱是Compare And Swap,即比較交換,它也是JUC的基礎。我們只是簡單介紹它的原理,細節問題需要同學們另行研究。
/*
* v-表示要更新的變量,e-表示預期值,n-新值。
* 方法的目的是給變量v修改值。
* 如果變量v的值和預期值e相等就把v的值改成n,返回true,如果不相等就返回false,有其他線程修改該值。
* 這裡可能出現ABA問題(從A變成B又變回A,造成變量沒變得假象),但java做了很多優化。可以忽略不計。
*/
boolean CAS(v,e,n)
cas流程很好理解,但在多cpu多線程的情況下會不會不安全,放心安全。java的cas其實是通過Unsafe類方法調用cpu的一條cas原子指令。作業系統本身是對內存操作做了線程安全的,篇幅太少也說不清楚,這裡大家可以自行研究一下JMM,JMM不是本文重點。這裡只要知道結論就是CAS可以保證原子性。不過它只提供單個變量的原子性,多個變量的原子性還需要藉助synchronized。
Unsafe是java里的另類,java有個特點是安全,並不允許程式設計師直接操作內存,而Unsafe類可以,才有條件執行CAS方法。但是不建議大家使用Unsafe.class,因為不安全,sun公司有計劃取消Unsafe。
源碼解析
sizeCtl & constructor
ConcurrentHashMap和HashMap在各自的構造函數中都沒有做數組初始化,初始化放在了第一次添加元素中。值得注意的是ConcurrentHashMap中有一個屬性sizeCtl特別重要,理清楚它的變化,也就理解了整個Map源碼的流程。下面是它的說明
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
* <p>
* 控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
* <p>
* 1. 當為負數時:-1代表正在初始化,-N代表有N-1個線程正在進行擴容
* <p>
* 2.當為0時:代表當時的table還沒有被初始化
* <p>
* 3.當為正數時:未初始化表示的是初始化數組的初始容量,如果已經初始化,
* 記錄的是擴容的閾值(達到閾值進行擴容)
*/
private transient volatile int sizeCtl;
再看一下ConcurrentHashMap帶初始化容量的代碼
/**
* Creates a new, empty map with an initial table size
* accommodating the specified number of elements without the need
* to dynamically resize.
*
* @param initialCapacity The implementation performs internal
* sizing to accommodate this many elements.
* @throws IllegalArgumentException if the initial capacity of
* elements is negative
*
* 此時sizeCtl記錄的就是數組的初始化容量
*
* 比如initialCapacity=5
* 調用tableSizeFor(5+5/2+1)==tableSizeFor(8)
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
* 返回一個大於等於c的2的冪次方數
*
* 當c=8時
* n = c-1=7
* 接下來驗算最終結果
* 0000 0000 0000 0000 0000 0000 0000 0111
* >>> 1
* = 0000 0000 0000 0000 0000 0000 0000 0011
* | 0000 0000 0000 0000 0000 0000 0000 0111
* = 0000 0000 0000 0000 0000 0000 0000 0111
* >>> 2
* = 0000 0000 0000 0000 0000 0000 0000 0001
* | 0000 0000 0000 0000 0000 0000 0000 0111
* = 0000 0000 0000 0000 0000 0000 0000 0111
* >>> 4
* = 0000 0000 0000 0000 0000 0000 0000 0000
* | 0000 0000 0000 0000 0000 0000 0000 0111
* = 0000 0000 0000 0000 0000 0000 0000 0111
* 下面再 >>> 8 和 >>> 16後的二進位都是0
* 所以最終結果就是111,也就是7最後返回結果再+1,等於8
*
* 總結 右移一共1+2+4+8+16=31位,和與之對應 | 運算
* 最終把n的二進位中所有1移到低位。新的數高位都是0,低位都是1。這樣格式的數在HashMap中提到過,就是2的冪次-1。
* 最後結果是這個數+1,那就是2的冪次。
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
當我們new ConcurrentHashMap(c) 時,初始化容量並不是c,而是一個大於等於c的2的冪次方數。我們利用發射來驗證下
public static void main(String[] args) {
ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(5);
Class clazz = concurrentHashMap.getClass();
try {
Field field = clazz.getDeclaredField("sizeCtl");
//打開私有訪問
field.setAccessible(true);
//獲取屬性
String name = field.getName();
//獲取屬性值
Object value = field.get(concurrentHashMap);
System.out.println("ConcurrentHashMap的初始容量為:= "+value);
} catch (Exception e) {
e.printStackTrace();
}
}
--列印結果是: Map的初始容量=8
put & putVal
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
@Override
public V put(K key, V value) {
return putVal(key, value, false);
}
/**
* Implementation for put and putIfAbsent
*
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果有空值或者空鍵,直接拋異常
if (key == null || value == null) {
throw new NullPointerException();
}
//兩次hash,減少hash衝突,可以均勻分布
int hash = spread(key.hashCode());
int binCount = 0;
//疊代當前table
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
//1. 如果table未初始化,先初始化
if (tab == null || (n = tab.length) == 0) {
tab = initTable();
}
//如果i位置沒有數據,cas插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas和外側else if條件形成雙保險,保證數據安全
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null))) {
break; // no lock when adding to empty bin
}
}
//2. hash值是MOVED表示數組正在擴容,則協助擴容,先擴容在新加元素
else if ((fh = f.hash) == MOVED) {
tab = helpTransfer(tab, f);
} else {
//hash計算的bucket不為空,且當前沒有處於擴容操作,進行元素添加
V oldVal = null;
//對當前bucket進行加鎖,保證線程安全,執行元素添加操作
synchronized (f) {
//判斷是否為f,防止它變成tree
if (tabAt(tab, i) == f) {
//hash值>=0 表示該節點是鍊表結構
if (fh >= 0) {
binCount = 1;
//e記錄的是頭節點
for (Node<K, V> e = f; ; ++binCount) {
K ek;
//相同的key進行put就會覆蓋原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
if ((e = e.next) == null) {
//插入鍊表尾部
pred.next = new Node<K, V>(hash, key,
value, null);
break;
}
}
} else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
//紅黑樹結構旋轉插入
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) {
p.val = value;
}
}
}
}
}
if (binCount != 0) {
//鍊表長度大於8時轉換紅黑樹
if (binCount >= TREEIFY_THRESHOLD) {
treeifyBin(tab, i);
}
if (oldVal != null) {
return oldVal;
}
break;
}
}
}
//統計size,並且檢查是否需要擴容
addCount(1L, binCount);
return null;
}
putVal()總體是自旋+CAS的方式,流程和HashMap一樣。
- 自旋:如果table==null,調用initTable()初始化如果沒有hash碰撞就CAS添加如果正在擴容就協助擴容如果存在hash碰撞,如果是單向列表就插到bucket尾部,如果是紅黑樹就插入數結構如果鍊表bucket長度大於8,轉紅黑樹如果添加成功就調用addCount()方法統計size,檢查是否需要擴容
從源碼中可以看到put新元素時,如果發生hash衝突,先鎖定發生衝突的bucket,不影響其他bucket操作,達到並發安全且高效的目的。下面是`putVal
initTable,初始化table
/**
* Initializes table, using the size recorded in sizeCtl.
* 初始化table,從新記錄sizeCtl值,此時值為數組下次擴容的閾值
*/
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
//再次判斷空的table才能進入初始化操作
while ((tab = table) == null || tab.length == 0) {
// sizeCtl<0,也就是下面elseif把sizeCtl設置成-1. 表示其他線程已經在初始化了或者擴容了,掛起當前線程,自旋等待
if ((sc = sizeCtl) < 0) {
Thread.yield();
//CAS設置SIZECTL為-1,如果設置成功繼續執行下面操作,如果失敗,說明此時有其他線程正在執行操作,繼續自旋
} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//double check,保證線程安全,可能有線程已經同步完了
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
//記錄下次擴容的大小,相當於n-n/4=0.75n
sc = n - (n >>> 2);
}
} finally {
//此時sizeCtl的值為下次擴容的閾值
sizeCtl = sc;
}
break;
}
}
return tab;
}
helpTransfer 協助擴容
/**
* Helps transfer if a resize is in progress.
* <p>
* 如果數組正在擴容,協助之,多個工作線程一起擴容
* 從舊的table的元素複製到新的table中
*
*/
final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
Node<K, V>[] nextTab;
int sc;
//如果f是ForwardingNode,說明f正在擴容,hash值已經被標為MOVED。
//ForwardingNode.nextTable就是新table不為空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
//根據 length 得到一個前16位的標識符,數組容量大小。
int rs = resizeStamp(tab.length);
//多重條件判斷未擴容完成,還在進行中,新老數組都沒有變,且sizeCtl<0
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 1. sizeCtl 無符號右移16位獲得高16位如果不等 rs 標識符變了
// 2. (sc == rs + 1),表示擴容結束
// 3. (sc == rs + MAX_RESIZERS)達到了最大幫助線程個數 65535個
// 4. transferIndex<= 0 也表示擴容已經結束
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//增加一個線程幫助擴容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
TreeNode結構
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K, V> extends Node<K, V> {
TreeNode<K, V> parent; // red-black tree links
TreeNode<K, V> left;
TreeNode<K, V> right;
TreeNode<K, V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K, V> next,
TreeNode<K, V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}
TreeNode繼承了Node,又多了prev等,這裡很少人注意,其實它在維護紅黑樹的同時也維護了雙向列表。雖然紅黑樹查詢方便,但遷移真的好難,藉助雙向列表做遷移會容易很多。
transfer 單向列表擴容
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
* 多線程擴容操作
*/
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
int n = tab.length, stride;
//數組遷移分塊執行,每核處理的bucket量小於16個,則強制賦值16,
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
stride = MIN_TRANSFER_STRIDE; // subdivide range
}
//如果是擴容線程,此時新數組為null
if (nextTab == null) { // initiating
try {
//構建新數組,其容量為原來容量的2倍
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//記錄線程開始遷移的bucket,從後往前遷移
transferIndex = n;
}
int nextn = nextTab.length;
//已經遷移的桶位,會用fwd占位(這個節點的hash值為MOVED),這個在put方法中見到過
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
// 當advance == true時,表明該節點已經處理過了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
//計算每一個線程負責哪部分,遷移以後賦fwd節點
//i記錄當前正在遷移桶位的索引值
//bound記錄下一次任務遷移的開始桶位
//--i>=bound 表示當前線程分配的遷移任務還沒有完成
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) {
advance = false;
//沒有元素需要遷移
} else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
} else if (U.compareAndSwapInt // 用CAS計算得到下一次任務遷移的開始桶位,值值給transferIndex
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//沒有更多的需要遷移的bucket
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 擴容結束後,table指向新數組,重新計算擴容閾值,賦值給sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 擴容任務線程數減1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判斷當前所有擴容任務是否執行完成,相等表明完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
return;
}
finishing = advance = true;
i = n; // recheck before commit
}
} else if ((f = tabAt(tab, i)) == null) { //當前節點為null,在該位置添加一個ForwardingNode
advance = casTabAt(tab, i, null, fwd);
} else if ((fh = f.hash) == MOVED) {//如果是ForwardingNode,說明已經擴容過
advance = true; // already processed
} else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K, V> ln, hn;
// fh >= 0 ,表示為鍊表節點
if (fh >= 0) {
// 構造兩個鍊表,一個是原鍊表,另一個是原鍊表的反序排列
int runBit = fh & n;
Node<K, V> lastRun = f;
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0) {
ln = new Node<K, V>(ph, pk, pv, ln);
} else {
hn = new Node<K, V>(ph, pk, pv, hn);
}
}
// 先擴容再插入相應值
//新table的i位置添加元素
setTabAt(nextTab, i, ln);
//新table的i+1位置添加元素
setTabAt(nextTab, i + n, hn);
// 舊table i 位置處插上ForwardingNode,表示該節點已經處理過
setTabAt(tab, i, fwd);
advance = true;
// 紅黑樹處理邏輯,實質上是維護雙向鍊表
} else if (f instanceof TreeBin) {
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null) {
lo = p;
} else {
loTail.next = p;
}
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null) {
hi = p;
} else {
hiTail.next = p;
}
hiTail = p;
++hc;
}
}
// 擴容後紅黑樹節點個數若<=6,將樹轉單向鍊表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K, V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
總結
ConcurrentHashMap設計之精妙驚為天人,不愧為大師之作。有3個點可能之前沒有注意
- 在new ConcurrentHashMap(c)時,初始容量並不是傳入的值。而是一個大於等於該值的2的冪次方值
- 世人都知道鍊表大於6的時候會轉紅黑樹,卻很少有人提及在紅黑樹節點個數小於等於6時會轉成鍊表
- ConcurrentHashMap和HashMap的數據結構嚴格的說應該是數組+單向列表+(紅黑樹+雙向鍊表)本人水平有限,文中難免會有謬誤,還請各位指出。參考jdk1.8 & jdk1.7#ConcurrentHashMap源碼黑馬程式設計師公開課