集合是編程中最常用的數(shù)據(jù)結(jié)構(gòu)。而談到并發(fā),幾乎總是離不開集合這類高級數(shù)據(jù)結(jié)構(gòu)的支持。比如兩個線程需要同時訪問一個中間臨界區(qū)(Queue),比如常會用緩存作為外部文件的副本(HashMap)。這篇文章主要分析jdk1.5的3種并發(fā)集合類型(concurrent,copyonright,queue)中的ConcurrentHashMap,讓我們從原理上細(xì)致的了解它們,能夠讓我們在深度項(xiàng)目開發(fā)中獲益非淺。 通過分析Hashtable就知道,synchronized是針對整張Hash表的,即每次鎖住整張表讓線程獨(dú)占,ConcurrentHashMap允許多個修改操作并發(fā)進(jìn)行,其關(guān)鍵在于使用了鎖分離技術(shù)。它使用了多個鎖來控制對hash表的不同部分進(jìn)行的修改。ConcurrentHashMap內(nèi)部使用段(Segment)來表示這些不同的部分,每個段其實(shí)就是一個小的hash table,它們有自己的鎖。只要多個修改操作發(fā)生在不同的段上,它們就可以并發(fā)進(jìn)行。 一、結(jié)構(gòu)解析 ConcurrentHashMap和Hashtable主要區(qū)別就是圍繞著鎖的粒度以及如何鎖,可以簡單理解成把一個大的HashTable分解成多個,形成了鎖分離。如圖:
而Hashtable的實(shí)現(xiàn)方式是---鎖整個hash表 二、應(yīng)用場景 當(dāng)有一個大數(shù)組時需要在多個線程共享時就可以考慮是否把它給分層多個節(jié)點(diǎn)了,避免大鎖。并可以考慮通過hash算法進(jìn)行一些模塊定位。 其實(shí)不止用于線程,當(dāng)設(shè)計數(shù)據(jù)表的事務(wù)時(事務(wù)某種意義上也是同步機(jī)制的體現(xiàn)),可以把一個表看成一個需要同步的數(shù)組,如果操作的表數(shù)據(jù)太多時就可以考慮事務(wù)分離了(這也是為什么要避免大表的出現(xiàn)),比如把數(shù)據(jù)進(jìn)行字段拆分,水平分表等. 三、源碼解讀 ConcurrentHashMap中主要實(shí)體類就是三個:ConcurrentHashMap(整個Hash表),Segment(桶),HashEntry(節(jié)點(diǎn)),對應(yīng)上面的圖可以看出之間的關(guān)系 /**
* The segments, each of which is a specialized hash table
*/
final Segment<K,V>[] segments; 不變(Immutable)和易變(Volatile)
1. static final class HashEntry<K,V> {
2. final K key;
3. final int hash;
4. volatile V value;
5. final HashEntry<K,V> next;
6. }
可以看到除了value不是final的,其它值都是final的,這意味著不能從hash鏈的中間或尾部添加或刪除節(jié)點(diǎn),因?yàn)檫@需要修改next 引用值,所有的節(jié)點(diǎn)的修改只能從頭部開始。對于put操作,可以一律添加到Hash鏈的頭部。但是對于remove操作,可能需要從中間刪除一個節(jié)點(diǎn),這就需要將要刪除節(jié)點(diǎn)的前面所有節(jié)點(diǎn)整個復(fù)制一遍,最后一個節(jié)點(diǎn)指向要刪除結(jié)點(diǎn)的下一個結(jié)點(diǎn)。這在講解刪除操作時還會詳述。為了確保讀操作能夠看到最新的值,將value設(shè)置成volatile,這避免了加鎖。
這是定位段的方法: 1. final Segment<K,V> segmentFor(int hash) {
2. return segments[(hash >>> segmentShift) & segmentMask];
3. } 數(shù)據(jù)結(jié)構(gòu)
1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
2. implements ConcurrentMap<K, V>, Serializable {
3. /**
4. * Mask value for indexing into segments. The upper bits of a
5. * key's hash code are used to choose the segment.
6. */
7. final int segmentMask;
8.
9. /**
10. * Shift value for indexing within segments.
11. */
12. final int segmentShift;
13.
14. /**
15. * The segments, each of which is a specialized hash table
16. */
17. final Segment<K,V>[] segments;
18. }
所有的成員都是final的,其中segmentMask和segmentShift主要是為了定位段,參見上面的segmentFor方法。
1. static final class Segment<K,V> extends ReentrantLock implements Serializable {
2. private static final long serialVersionUID = 2249069246763182397L;
3. /**
4. * The number of elements in this segment's region.
5. */
6. transient volatile int count;
7.
8. /**
9. * Number of updates that alter the size of the table. This is
10. * used during bulk-read methods to make sure they see a
11. * consistent snapshot: If modCounts change during a traversal
12. * of segments computing size or checking containsValue, then
13. * we might have an inconsistent view of state so (usually)
14. * must retry.
15. */
16. transient int modCount;
17.
18. /**
19. * The table is rehashed when its size exceeds this threshold.
20. * (The value of this field is always <tt>(int)(capacity *
21. * loadFactor)</tt>.)
22. */
23. transient int threshold;
24.
25. /**
26. * The per-segment table.
27. */
28. transient volatile HashEntry<K,V>[] table;
29.
30. /**
31. * The load factor for the hash table. Even though this value
32. * is same for all segments, it is replicated to avoid needing
33. * links to outer object.
34. * @serial
35. */
36. final float loadFactor;
37. }
count用來統(tǒng)計該段數(shù)據(jù)的個數(shù),它是volatile(volatile 變量使用指南),它用來協(xié)調(diào)修改和讀取操作,以保證讀取操作能夠讀取到幾乎最新的修改。協(xié)調(diào)方式是這樣的,每次修改操作做了結(jié)構(gòu)上的改變,如增加/刪除節(jié)點(diǎn)(修改節(jié)點(diǎn)的值不算結(jié)構(gòu)上的改變),都要寫count值,每次讀取操作開始都要讀取count的值。這利用了 Java 5中對volatile語義的增強(qiáng),對同一個volatile變量的寫和讀存在happens-before關(guān)系。modCount統(tǒng)計段結(jié)構(gòu)改變的次數(shù),主要是為了檢測對多個段進(jìn)行遍歷過程中某個段是否發(fā)生改變,在講述跨段操作時會還會詳述。threashold用來表示需要進(jìn)行rehash的界限值。table數(shù)組存儲段中節(jié)點(diǎn),每個數(shù)組元素是個hash鏈,用HashEntry表示。table也是volatile,這使得能夠讀取到最新的 table值而不需要同步。loadFactor表示負(fù)載因子。 先來看下刪除操作remove(key)。
1. public V remove(Object key) {
2. hash = hash(key.hashCode());
3. return segmentFor(hash).remove(key, hash, null);
4. }
整個操作是先定位到段,然后委托給段的remove操作。當(dāng)多個刪除操作并發(fā)進(jìn)行時,只要它們所在的段不相同,它們就可以同時進(jìn)行。下面是Segment的remove方法實(shí)現(xiàn):
1. V remove(Object key, int hash, Object value) {
2. lock();
3. try {
4. int c = count - 1;
5. HashEntry<K,V>[] tab = table;
6. int index = hash & (tab.length - 1);
7. HashEntry<K,V> first = tab[index];
8. HashEntry<K,V> e = first;
9. while (e != null && (e.hash != hash || !key.equals(e.key)))
10. e = e.next;
11.
12. V oldValue = null;
13. if (e != null) {
14. V v = e.value;
15. if (value == null || value.equals(v)) {
16. oldValue = v;
17. // All entries following removed node can stay
18. // in list, but all preceding ones need to be
19. // cloned.
20. ++modCount;
21. HashEntry<K,V> newFirst = e.next;
22. *for (HashEntry<K,V> p = first; p != e; p = p.next)
23. *newFirst = new HashEntry<K,V>(p.key, p.hash,
24. newFirst, p.value);
25. tab[index] = newFirst;
26. count = c; // write-volatile
27. }
28. }
29. return oldValue;
30. } finally {
31. unlock();
32. }
33. }
整個操作是在持有段鎖的情況下執(zhí)行的,空白行之前的行主要是定位到要刪除的節(jié)點(diǎn)e。接下來,如果不存在這個節(jié)點(diǎn)就直接返回null,否則就要將e前面的結(jié)點(diǎn)復(fù)制一遍,尾結(jié)點(diǎn)指向e的下一個結(jié)點(diǎn)。e后面的結(jié)點(diǎn)不需要復(fù)制,它們可以重用。 中間那個for循環(huán)是做什么用的呢?(*號標(biāo)記)從代碼來看,就是將定位之后的所有entry克隆并拼回前面去,但有必要嗎?每次刪除一個元素就要將那之前的元素克隆一遍?這點(diǎn)其實(shí)是由entry的不變性來決定的,仔細(xì)觀察entry定義,發(fā)現(xiàn)除了value,其他所有屬性都是用final來修飾的,這意味著在第一次設(shè)置了next域之后便不能再改變它,取而代之的是將它之前的節(jié)點(diǎn)全都克隆一次。至于entry為什么要設(shè)置為不變性,這跟不變性的訪問不需要同步從而節(jié)省時間有關(guān) 下面是個示意圖 刪除元素之前:
刪除元素3之后:
第二個圖其實(shí)有點(diǎn)問題,復(fù)制的結(jié)點(diǎn)中應(yīng)該是值為2的結(jié)點(diǎn)在前面,值為1的結(jié)點(diǎn)在后面,也就是剛好和原來結(jié)點(diǎn)順序相反,還好這不影響我們的討論。
1. V put(K key, int hash, V value, boolean onlyIfAbsent) {
2. lock();
3. try {
4. int c = count;
5. if (c++ > threshold) // ensure capacity
6. rehash();
7. HashEntry<K,V>[] tab = table;
8. int index = hash & (tab.length - 1);
9. HashEntry<K,V> first = tab[index];
10. HashEntry<K,V> e = first;
11. while (e != null && (e.hash != hash || !key.equals(e.key)))
12. e = e.next;
13.
14. V oldValue;
15. if (e != null) {
16. oldValue = e.value;
17. if (!onlyIfAbsent)
18. e.value = value;
19. }
20. else {
21. oldValue = null;
22. ++modCount;
23. tab[index] = new HashEntry<K,V>(key, hash, first, value);
24. count = c; // write-volatile
25. }
26. return oldValue;
27. } finally {
28. unlock();
29. }
30. }
該方法也是在持有段鎖(鎖定整個segment)的情況下執(zhí)行的,這當(dāng)然是為了并發(fā)的安全,修改數(shù)據(jù)是不能并發(fā)進(jìn)行的,必須得有個判斷是否超限的語句以確保容量不足時能夠rehash。接著是找是否存在同樣一個key的結(jié)點(diǎn),如果存在就直接替換這個結(jié)點(diǎn)的值。否則創(chuàng)建一個新的結(jié)點(diǎn)并添加到hash鏈的頭部,這時一定要修改modCount和count的值,同樣修改count的值一定要放在最后一步。put方法調(diào)用了rehash方法,reash方法實(shí)現(xiàn)得也很精巧,主要利用了table的大小為2^n,這里就不介紹了。而比較難懂的是這句int index = hash & (tab.length - 1),原來segment里面才是真正的hashtable,即每個segment是一個傳統(tǒng)意義上的hashtable,如上圖,從兩者的結(jié)構(gòu)就可以看出區(qū)別,這里就是找出需要的entry在table的哪一個位置,之后得到的entry就是這個鏈的第一個節(jié)點(diǎn),如果e!=null,說明找到了,這是就要替換節(jié)點(diǎn)的值(onlyIfAbsent == false),否則,我們需要new一個entry,它的后繼是first,而讓tab[index]指向它,什么意思呢?實(shí)際上就是將這個新entry插入到鏈頭,剩下的就非常容易理解了
1. V get(Object key, int hash) {
2. if (count != 0) { // read-volatile 當(dāng)前桶的數(shù)據(jù)個數(shù)是否為0
3. HashEntry<K,V> e = getFirst(hash); 得到頭節(jié)點(diǎn)
4. while (e != null) {
5. if (e.hash == hash && key.equals(e.key)) {
6. V v = e.value;
7. if (v != null)
8. return v;
9. return readValueUnderLock(e); // recheck
10. }
11. e = e.next;
12. }
13. }
14. return null;
15. }
get操作不需要鎖。第一步是訪問count變量,這是一個volatile變量,由于所有的修改操作在進(jìn)行結(jié)構(gòu)修改時都會在最后一步寫count 變量,通過這種機(jī)制保證get操作能夠得到幾乎最新的結(jié)構(gòu)更新。對于非結(jié)構(gòu)更新,也就是結(jié)點(diǎn)值的改變,由于HashEntry的value變量是 volatile的,也能保證讀取到最新的值。接下來就是根據(jù)hash和key對hash鏈進(jìn)行遍歷找到要獲取的結(jié)點(diǎn),如果沒有找到,直接訪回null。對hash鏈進(jìn)行遍歷不需要加鎖的原因在于鏈指針next是final的。但是頭指針卻不是final的,這是通過getFirst(hash)方法返回,也就是存在 table數(shù)組中的值。這使得getFirst(hash)可能返回過時的頭結(jié)點(diǎn),例如,當(dāng)執(zhí)行g(shù)et方法時,剛執(zhí)行完getFirst(hash)之后,另一個線程執(zhí)行了刪除操作并更新頭結(jié)點(diǎn),這就導(dǎo)致get方法中返回的頭結(jié)點(diǎn)不是最新的。這是可以允許,通過對count變量的協(xié)調(diào)機(jī)制,get能讀取到幾乎最新的數(shù)據(jù),雖然可能不是最新的。要得到最新的數(shù)據(jù),只有采用完全的同步。 1. V readValueUnderLock(HashEntry<K,V> e) {
2. lock();
3. try {
4. return e.value;
5. } finally {
6. unlock();
7. }
8. }
另一個操作是containsKey,這個實(shí)現(xiàn)就要簡單得多了,因?yàn)樗恍枰x取值:
1. boolean containsKey(Object key, int hash) {
2. if (count != 0) { // read-volatile
3. HashEntry<K,V> e = getFirst(hash);
4. while (e != null) {
5. if (e.hash == hash && key.equals(e.key))
6. return true;
7. e = e.next;
8. }
9. }
10. return false;
11. }
優(yōu)秀博文:
|
|
來自: dtl樂學(xué)館 > 《多線程》