一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

BlockingQueue及其各個實現(xiàn)的分析整理 | 三石·道

 飛起來的感覺 2014-03-07

前面在整理的ThreadPoolExecutor類中,有一個從提交任務(wù)到線程池分配線程執(zhí)行任務(wù),有一個隊列,而這個隊列采用的就是BlockingQueue。BlockingQueue實際上定義了一個接口,在java.util.concurrent包中給出了這個接口的一些實現(xiàn),這篇我們整理一下。

0. BlockingQueue簡介

BlockingQueue是java.util.concurrent包中的接口,擴展了java.util中的的Queue接口。

在Java7的API中,這個接口有11個public方法。

但對于BlockingQueue來說,其本身就是一個就是一個阻塞隊列,所以這些操作的方法中,最重要的兩個就是put()和take()方法,這也是本篇中重點分析的地方,其它的方法可以參見JavaDoc文檔。

BlockingQueue的實現(xiàn)有一個特點,隊列元素不接受null值。

BlockingQueue這個接口在JDK中提供了很多具體實現(xiàn),包括了數(shù)組、鏈表等實現(xiàn),下面就對這些實現(xiàn)類簡要分析下。

1. 數(shù)組實現(xiàn)的ArrayBlockingQueue

看下ArrayBlockingQueue的構(gòu)造方法,一共有三個:

  • ArrayBlockingQueue(int capacity)
  • ArrayBlockingQueue(int capacity, boolean fair)
  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

我們發(fā)現(xiàn),構(gòu)造方法中并沒有無參的,這意味著隊列的容量是沒有默認的,在使用的時候需要給出容量值。

后兩個構(gòu)造方法還有fair這個參數(shù)。這個fair可謂是似曾相識,其實它就是ReentrantLock對象初始化要用到的那個參數(shù)。我們知道ArrayBlockingQueue既然是阻塞隊列,那么一定會有阻塞和喚醒,這里的實現(xiàn)用到的是Condition的await()和signal() / signalAll(),而用Condition的前提就是有對應(yīng)的Lock對象,在ArrayBlockingQueue實現(xiàn)中,take()和put()用的是統(tǒng)一的一個單鎖。在ArrayBlockingQueue的某些并發(fā)操作方法中,是需要加鎖來保證線程安全的,而這就是fair參數(shù)的作用。

對于隊列“空”和“滿”的情況,分別使用了兩個Condition對象來維護。

另外,ArrayBlockingQueue類我們直接理解就是數(shù)組實現(xiàn)的阻塞隊列。沒錯,其中的數(shù)據(jù)元素是用Object[]來保存的。對于take()和put()方法,則是分別使用了takeIndex和putIndex這兩個索引值來記錄存放數(shù)據(jù)的位置。

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}

如上,是take()方法實現(xiàn)的源碼。邏輯很簡單,先加鎖,然后判斷是否隊列已空,如條件為真,則阻塞,然后取出隊列中的元素。我們看到,阻塞是通過對notEmpty這個Condition對象的await()方法調(diào)用來做到的,與此對應(yīng),extract()方法中實際上也有一個notFull.signal()的調(diào)用。

2. 單向鏈表實現(xiàn)的LinkedBlockingQueue

LinkedBlockingQueue是JDK中BlockingQueue的有一個主要的實現(xiàn)。按照JavaDoc上所述,LinkedBlockingQueue是一個容量可選的阻塞隊列。存在LinkedBlockingQueue()無參的默認構(gòu)造方法實現(xiàn),使用Integer.MAX_VALUE作為默認容量。

在LinkedBlockingQueue類的實現(xiàn)中,很重要的一個和ArrayBlockingQueue不同的地方,是對put()和take()分別使用了兩個不同的鎖,都使用了ReentrantLock實現(xiàn)。而針對“空”和“滿”的阻塞條件,也是對這兩個所對象分別構(gòu)建的兩個Condition對象(notEmpty和notFull),構(gòu)成了雙鎖雙條件。此外,LinkedBlockingQueue也為take和put操作分別維護了索引takeIndex和putIndex。兩鎖或者說隊列狀態(tài)的協(xié)調(diào)一致其實也是通過兩個條件對象的await()和signal()來達成。

1
2
3
4
5
6
7
8
9
10
11
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

此外,對于隊列中元素的計數(shù),LinkedBlockingQueue也和ArrayBlockingQueue的實現(xiàn)略有不同,使用了AtomicInteger類對象。

對于put()和take()以及類似的操作,雙鎖避免了互相影響,一定意義上看,減小了操作的鎖粒度,提高了并發(fā)性。

但對于其他操作,為了保證線程安全,都是雙鎖同時鎖定。雙鎖使用要避免死鎖問題,這個類實現(xiàn)中是統(tǒng)一定義了fullyLock()和fullyUnlock()的方法,先鎖定的后釋放,避免死鎖發(fā)生的可能。

除了用數(shù)組和隊列不同數(shù)據(jù)結(jié)構(gòu)對BlockingQueue接口的基本實現(xiàn)外,還有其他幾種有特殊功能的實現(xiàn)。

3. DelayQueue

基本特征是容量無界,實現(xiàn)上單鎖單條件。

功能特點上,實際上是對優(yōu)先級隊列PriorityQueue類的一個封裝。放入隊列的元素要滿足要求<E extends Delayed>。比較器是時間,因為:

1
public interface Delayed extends Comparable<Delayed>

元素需要給出getDelay()方法(實際上是Delayed接口的要求)。

等待第一個元素的線程被設(shè)置為leader,后續(xù)線程無限期等待,直到leader通知他們。隊列中的數(shù)據(jù)元素超時后,元素可被返回。

4. 同步隊列SynchronousQueue

這個類在Executors中使用ThreadPoolExecutor類構(gòu)造CachedThreadPool的時候被用到了。SynchronousQueue的特點是,讀取操作take()和放入操作put()同時完成才會同事解開阻塞。即一個元素只有當(dāng)其本身被take()的時候put()才會被喚醒。沒有容量的概念。

構(gòu)造方法中可以帶fair參數(shù),分為公平和非公平實現(xiàn),具體的實現(xiàn)分別為隊列和棧,順序不同。具體的代碼實現(xiàn)依賴于內(nèi)部類TransferQueue和TransferStack,邏輯較為復(fù)雜,這里不做細節(jié)分析。實現(xiàn)中的阻塞機制直接使用LockSupport的park()方法。

5. 順便說說Exchanger類

這個類也是java.util.concurrent包中的,但和BlockingQueue并無直接層次結(jié)構(gòu)關(guān)系。這里提到它主要是因為從用法上來看,相當(dāng)于一個二項的SynchronousQueue。

具體實現(xiàn)上比較復(fù)雜,不做詳細分析,記錄下幾點:

  • 注意到Slot和Node都是AtomicReference,其compareAndSet并不是設(shè)置node或者item,而是引用值,巧妙的利用了Node的引用值和item做數(shù)據(jù)交換
  • (高并發(fā)情況)實現(xiàn)上用了類似concurrentHashMap的segment方式,有插槽Slot的概念
  • 阻塞機制用Locksupport.park()

6. TransferQueue

最后說下TransferQueue這個接口,這個類是java.util.concurrent包中在Java7中增加的,可以看到注釋中的“@since 1.7”。和前面的不同,TransferQueue只是一個接口,不是一個實現(xiàn)。在JDK1.7中,有LinkedTransferQueue這樣一個實現(xiàn)類。需要注意區(qū)分,這個TransferQueue和SynchronousQueue的內(nèi)部實現(xiàn)類TransferQueue不是同一個類。

這個接口/類實際上是一個比SynchronousQueue更靈活更高級的同步隊列,放入新元素可以阻塞也可以非阻塞,并且也可以設(shè)定隊列的元素容量。

這篇對BlockingQueue的小結(jié)就到這里。

0

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    一区二区三区人妻在线| 国产精品亚洲综合天堂夜夜| 亚洲中文字幕综合网在线| 日韩在线中文字幕不卡| 日韩一区中文免费视频| 精品久久久一区二区三| 女人高潮被爽到呻吟在线观看 | 亚洲综合色在线视频香蕉视频| 国产欧美日韩精品一区二区| 最近最新中文字幕免费| 日本不卡在线一区二区三区| 1024你懂的在线视频| 日韩少妇人妻中文字幕| 欧美一区二区三区十区| 风韵人妻丰满熟妇老熟女av | 国产精品国产亚洲区久久| 欧美日本精品视频在线观看| 欧美日韩国产午夜福利| 欧美午夜国产在线观看| 日韩一本不卡在线观看| 中文字幕亚洲人妻在线视频| 91播色在线免费播放| 大香蕉久草网一区二区三区| 中文字幕日韩欧美理伦片| 亚洲欧美日韩精品永久| 91熟女大屁股偷偷对白| 欧美精品中文字幕亚洲| 中文字幕一区二区三区大片| 日韩亚洲激情在线观看| 久久99午夜福利视频| 成人日韩在线播放视频| 日韩性生活片免费观看| 日韩中文无线码在线视频| 欧美大胆美女a级视频| 亚洲综合激情另类专区老铁性| 国产女优视频一区二区| 久久99这里只精品热在线| 熟女少妇久久一区二区三区| 国产又猛又大又长又粗| 丁香六月婷婷基地伊人| 后入美臀少妇一区二区|