ConcurrentHashMap源碼解析,含6大核心方法

程序員讀書俱樂部 發佈 2021-08-03T07:46:28.889101+00:00

本文是建立在上篇《HashMap源碼分析》的基礎上。其中的一些重複的方法和知識點不會再贅述。有疑惑的同學可以移步到上一篇文章。依舊以jdk1.8源碼為基礎來講解ConcurrentHashMap。它的大體結構與HashMap相同,table容量同樣要求是2的冪次。

本文是建立在上篇《HashMap源碼分析》的基礎上。其中的一些重複的方法和知識點不會再贅述。有疑惑的同學可以移步到上一篇文章。依舊以jdk1.8源碼為基礎來講解ConcurrentHashMap。它的大體結構與HashMap相同,table容量同樣要求是2的冪次。

HashMap高效快捷,但不安全,特別是2020年,安全很重要。目前市面上有3種提供安全的map的方式,分別是

  1. hashtable:相對古老的線程安全機制。任一時間只有一個線程能寫操作。現在基本被效率更高的ConcurrentHashMap替代。
  2. synchronizedMap:Collections中的內部類,可以把普通的map轉成線程安全的map。原理就是在操作對象上加synchronized。
  3. ConcurrentHashMap:線程安全的HashMap。

如何做到線程安全

多線程並髮帶來的問題目前總體有2種解決機制。

  1. 悲觀機制:認為最壞的結果肯定發生,從開始就設置一定規則最大限度減少發生機率,有點以防萬一、未雨綢繆的意思。比如安全套,肯定不會每次都中,可萬一呢?並發的悲觀實現一般採用鎖,java中的synchronized關鍵字也被廣泛應用在多線程並發的場景中。但是鎖會降低程序性能。
  2. 樂觀機制:認為結果總是好的,先幹了再說,不行再想辦法:重試,補救,版本號控制等。意外懷孕的痴男怨女們可能就太樂觀了,事後只能採取補救措施。CAS就是樂觀機制。

ConcurrentHashMap中主要採用的CAS+自旋,改成功就改,改不成功繼續試(自旋)。也有synchronized配合使用。

CAS & Unsafe

CAS的全稱是Compare And Swap,即比較交換,它也是JUC的基礎。我們只是簡單介紹它的原理,細節問題需要同學們另行研究。

  /*
   * v-表示要更新的變量,e-表示預期值,n-新值。
   * 方法的目的是給變量v修改值。
   * 如果變量v的值和預期值e相等就把v的值改成n,返回true,如果不相等就返回false,有其他線程修改該值。
   * 這裡可能出現ABA問題(從A變成B又變回A,造成變量沒變得假象),但java做了很多優化。可以忽略不計。
  */
  boolean CAS(v,e,n) 

cas流程很好理解,但在多cpu多線程的情況下會不會不安全,放心安全。java的cas其實是通過Unsafe類方法調用cpu的一條cas原子指令。作業系統本身是對內存操作做了線程安全的,篇幅太少也說不清楚,這裡大家可以自行研究一下JMM,JMM不是本文重點。這裡只要知道結論就是CAS可以保證原子性。不過它只提供單個變量的原子性,多個變量的原子性還需要藉助synchronized。

Unsafe是java里的另類,java有個特點是安全,並不允許程式設計師直接操作內存,而Unsafe類可以,才有條件執行CAS方法。但是不建議大家使用Unsafe.class,因為不安全,sun公司有計劃取消Unsafe。

源碼解析

sizeCtl & constructor

ConcurrentHashMap和HashMap在各自的構造函數中都沒有做數組初始化,初始化放在了第一次添加元素中。值得注意的是ConcurrentHashMap中有一個屬性sizeCtl特別重要,理清楚它的變化,也就理解了整個Map源碼的流程。下面是它的說明

   
    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     * <p>
     * 控制標識符,用來控制table的初始化和擴容的操作,不同的值有不同的含義
     * <p>
     * 1. 當為負數時:-1代表正在初始化,-N代表有N-1個線程正在進行擴容
     * <p>
     * 2.當為0時:代表當時的table還沒有被初始化
     * <p>
     * 3.當為正數時:未初始化表示的是初始化數組的初始容量,如果已經初始化,
     * 記錄的是擴容的閾值(達到閾值進行擴容)
     */
    private transient volatile int sizeCtl;

再看一下ConcurrentHashMap帶初始化容量的代碼

   /**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     *
     * @param initialCapacity The implementation performs internal
     * sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     * elements is negative
     *
     * 此時sizeCtl記錄的就是數組的初始化容量
     * 
     * 比如initialCapacity=5
     * 調用tableSizeFor(5+5/2+1)==tableSizeFor(8)
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    /**
     * Returns a power of two table size for the given desired capacity.
     * See Hackers Delight, sec 3.2
     * 返回一個大於等於c的2的冪次方數
     *  
     * 當c=8時
     * n = c-1=7
     * 接下來驗算最終結果
     * 0000 0000 0000 0000 0000 0000 0000 0111
     * >>> 1
     * = 0000 0000 0000 0000 0000 0000 0000 0011
     * | 0000 0000 0000 0000 0000 0000 0000 0111
     * = 0000 0000 0000 0000 0000 0000 0000 0111
     *  >>> 2
     * = 0000 0000 0000 0000 0000 0000 0000 0001
     * | 0000 0000 0000 0000 0000 0000 0000 0111
     * = 0000 0000 0000 0000 0000 0000 0000 0111
     *  >>> 4
     * = 0000 0000 0000 0000 0000 0000 0000 0000
     * | 0000 0000 0000 0000 0000 0000 0000 0111
     * = 0000 0000 0000 0000 0000 0000 0000 0111
     * 下面再 >>> 8 和 >>> 16後的二進位都是0
     * 所以最終結果就是111,也就是7最後返回結果再+1,等於8
     * 
     * 總結 右移一共1+2+4+8+16=31位,和與之對應 | 運算
     * 最終把n的二進位中所有1移到低位。新的數高位都是0,低位都是1。這樣格式的數在HashMap中提到過,就是2的冪次-1。
     * 最後結果是這個數+1,那就是2的冪次。
     */
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

當我們new ConcurrentHashMap(c) 時,初始化容量並不是c,而是一個大於等於c的2的冪次方數。我們利用發射來驗證下

public static void main(String[] args) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(5);
        Class clazz = concurrentHashMap.getClass();
        try {
            Field field = clazz.getDeclaredField("sizeCtl");
            //打開私有訪問
            field.setAccessible(true);
            //獲取屬性
            String name = field.getName();
            //獲取屬性值
            Object value = field.get(concurrentHashMap);
            System.out.println("ConcurrentHashMap的初始容量為:= "+value);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
--列印結果是: Map的初始容量=8

put & putVal

    /**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p>The value can be retrieved by calling the {@code get} method
     * with a key that is equal to the original key.
     *
     * @param key   key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with {@code key}, or
     * {@code null} if there was no mapping for {@code key}
     * @throws NullPointerException if the specified key or value is null
     */
    @Override
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /**
     * Implementation for put and putIfAbsent
     *
     */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //如果有空值或者空鍵,直接拋異常
        if (key == null || value == null) {
            throw new NullPointerException();
        }
        //兩次hash,減少hash衝突,可以均勻分布
        int hash = spread(key.hashCode());
        int binCount = 0;
        //疊代當前table
        for (Node<K, V>[] tab = table; ; ) {
            Node<K, V> f;
            int n, i, fh;
            //1. 如果table未初始化,先初始化
            if (tab == null || (n = tab.length) == 0) {
                tab = initTable();
            }
            //如果i位置沒有數據,cas插入
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //cas和外側else if條件形成雙保險,保證數據安全
                if (casTabAt(tab, i, null,
                        new Node<K, V>(hash, key, value, null))) {
                    break;  // no lock when adding to empty bin
                }
            }
            //2. hash值是MOVED表示數組正在擴容,則協助擴容,先擴容在新加元素
            else if ((fh = f.hash) == MOVED) {
                tab = helpTransfer(tab, f);
            } else {
                //hash計算的bucket不為空,且當前沒有處於擴容操作,進行元素添加
                V oldVal = null;
                //對當前bucket進行加鎖,保證線程安全,執行元素添加操作
                synchronized (f) {
                    //判斷是否為f,防止它變成tree
                    if (tabAt(tab, i) == f) {
                        //hash值>=0 表示該節點是鍊表結構
                        if (fh >= 0) {
                            binCount = 1;
                            //e記錄的是頭節點
                            for (Node<K, V> e = f; ; ++binCount) {
                                K ek;
                                //相同的key進行put就會覆蓋原先的value
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K, V> pred = e;
                                if ((e = e.next) == null) {
                                    //插入鍊表尾部
                                    pred.next = new Node<K, V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {
                            Node<K, V> p;
                            binCount = 2;
                            //紅黑樹結構旋轉插入
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //鍊表長度大於8時轉換紅黑樹
                    if (binCount >= TREEIFY_THRESHOLD) {
                        treeifyBin(tab, i);
                    }
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        //統計size,並且檢查是否需要擴容
        addCount(1L, binCount); 
        return null;
    }

putVal()總體是自旋+CAS的方式,流程和HashMap一樣。

  • 自旋:如果table==null,調用initTable()初始化如果沒有hash碰撞就CAS添加如果正在擴容就協助擴容如果存在hash碰撞,如果是單向列表就插到bucket尾部,如果是紅黑樹就插入數結構如果鍊表bucket長度大於8,轉紅黑樹如果添加成功就調用addCount()方法統計size,檢查是否需要擴容

從源碼中可以看到put新元素時,如果發生hash衝突,先鎖定發生衝突的bucket,不影響其他bucket操作,達到並發安全且高效的目的。下面是`putVal

initTable,初始化table

   /**
     * Initializes table, using the size recorded in sizeCtl.
     * 初始化table,從新記錄sizeCtl值,此時值為數組下次擴容的閾值
     */
    private final Node<K, V>[] initTable() {
        Node<K, V>[] tab;
        int sc;
        //再次判斷空的table才能進入初始化操作
        while ((tab = table) == null || tab.length == 0) {
            // sizeCtl<0,也就是下面elseif把sizeCtl設置成-1. 表示其他線程已經在初始化了或者擴容了,掛起當前線程,自旋等待
            if ((sc = sizeCtl) < 0) {
                Thread.yield(); 
             //CAS設置SIZECTL為-1,如果設置成功繼續執行下面操作,如果失敗,說明此時有其他線程正在執行操作,繼續自旋
            } else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //double check,保證線程安全,可能有線程已經同步完了
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                        table = tab = nt;
                        //記錄下次擴容的大小,相當於n-n/4=0.75n
                        sc = n - (n >>> 2); 
                    }
                } finally {
                    //此時sizeCtl的值為下次擴容的閾值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

helpTransfer 協助擴容

    /**
     * Helps transfer if a resize is in progress.
     * <p>
     * 如果數組正在擴容,協助之,多個工作線程一起擴容
     * 從舊的table的元素複製到新的table中
     *
     */
    final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
        Node<K, V>[] nextTab;
        int sc;
        //如果f是ForwardingNode,說明f正在擴容,hash值已經被標為MOVED。
        //ForwardingNode.nextTable就是新table不為空
        if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
            //根據 length 得到一個前16位的標識符,數組容量大小。
            int rs = resizeStamp(tab.length);
            //多重條件判斷未擴容完成,還在進行中,新老數組都沒有變,且sizeCtl<0
            while (nextTab == nextTable && table == tab &&
                    (sc = sizeCtl) < 0) {
            // 1. sizeCtl 無符號右移16位獲得高16位如果不等 rs 標識符變了
            // 2. (sc == rs + 1),表示擴容結束 
            // 3. (sc == rs + MAX_RESIZERS)達到了最大幫助線程個數 65535個
            // 4. transferIndex<= 0 也表示擴容已經結束
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //增加一個線程幫助擴容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

TreeNode結構

    /**
     * Nodes for use in TreeBins
     */
    static final class TreeNode<K, V> extends Node<K, V> {
        TreeNode<K, V> parent;  // red-black tree links
        TreeNode<K, V> left;
        TreeNode<K, V> right;
        TreeNode<K, V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K, V> next,
                 TreeNode<K, V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
    }

TreeNode繼承了Node,又多了prev等,這裡很少人注意,其實它在維護紅黑樹的同時也維護了雙向列表。雖然紅黑樹查詢方便,但遷移真的好難,藉助雙向列表做遷移會容易很多。

transfer 單向列表擴容

    /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     * 多線程擴容操作
     */
    private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
        int n = tab.length, stride;
        //數組遷移分塊執行,每核處理的bucket量小於16個,則強制賦值16,
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        }
        //如果是擴容線程,此時新數組為null
        if (nextTab == null) {            // initiating
            try {
                //構建新數組,其容量為原來容量的2倍
                @SuppressWarnings("unchecked")
                Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //記錄線程開始遷移的bucket,從後往前遷移
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //已經遷移的桶位,會用fwd占位(這個節點的hash值為MOVED),這個在put方法中見到過
        ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
        // 當advance == true時,表明該節點已經處理過了
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0; ; ) {
            Node<K, V> f;
            int fh;
            //計算每一個線程負責哪部分,遷移以後賦fwd節點          
            //i記錄當前正在遷移桶位的索引值
            //bound記錄下一次任務遷移的開始桶位
            //--i>=bound 表示當前線程分配的遷移任務還沒有完成
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing) {
                    advance = false;
                //沒有元素需要遷移
                } else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                } else if (U.compareAndSwapInt // 用CAS計算得到下一次任務遷移的開始桶位,值值給transferIndex
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //沒有更多的需要遷移的bucket
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 擴容結束後,table指向新數組,重新計算擴容閾值,賦值給sizeCtl
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 擴容任務線程數減1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //判斷當前所有擴容任務是否執行完成,相等表明完成
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                        return;
                    }
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            } else if ((f = tabAt(tab, i)) == null) { //當前節點為null,在該位置添加一個ForwardingNode
                advance = casTabAt(tab, i, null, fwd);
            } else if ((fh = f.hash) == MOVED) {//如果是ForwardingNode,說明已經擴容過
                advance = true; // already processed
            } else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K, V> ln, hn;
                        // fh >= 0 ,表示為鍊表節點
                        if (fh >= 0) {
                            // 構造兩個鍊表,一個是原鍊表,另一個是原鍊表的反序排列
                            int runBit = fh & n;
                            Node<K, V> lastRun = f;
                            for (Node<K, V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            } else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K, V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash;
                                K pk = p.key;
                                V pv = p.val;
                                if ((ph & n) == 0) {
                                    ln = new Node<K, V>(ph, pk, pv, ln);
                                } else {
                                    hn = new Node<K, V>(ph, pk, pv, hn);
                                }
                            }
                            // 先擴容再插入相應值
                            //新table的i位置添加元素
                            setTabAt(nextTab, i, ln);
                            //新table的i+1位置添加元素
                            setTabAt(nextTab, i + n, hn);
                            // 舊table i 位置處插上ForwardingNode,表示該節點已經處理過
                            setTabAt(tab, i, fwd);
                            advance = true;
                            // 紅黑樹處理邏輯,實質上是維護雙向鍊表
                        } else if (f instanceof TreeBin) {
                            TreeBin<K, V> t = (TreeBin<K, V>) f;
                            TreeNode<K, V> lo = null, loTail = null;
                            TreeNode<K, V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K, V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K, V> p = new TreeNode<K, V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null) {
                                        lo = p;
                                    } else {
                                        loTail.next = p;
                                    }
                                    loTail = p;
                                    ++lc;
                                } else {
                                    if ((p.prev = hiTail) == null) {
                                        hi = p;
                                    } else {
                                        hiTail.next = p;
                                    }
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 擴容後紅黑樹節點個數若<=6,將樹轉單向鍊表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K, V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K, V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

總結

ConcurrentHashMap設計之精妙驚為天人,不愧為大師之作。有3個點可能之前沒有注意

  1. new ConcurrentHashMap(c)時,初始容量並不是傳入的值。而是一個大於等於該值的2的冪次方值
  2. 世人都知道鍊表大於6的時候會轉紅黑樹,卻很少有人提及在紅黑樹節點個數小於等於6時會轉成鍊表
  3. ConcurrentHashMap和HashMap的數據結構嚴格的說應該是數組+單向列表+(紅黑樹+雙向鍊表)本人水平有限,文中難免會有謬誤,還請各位指出。參考jdk1.8 & jdk1.7#ConcurrentHashMap源碼黑馬程式設計師公開課
關鍵字: