前面在整理的ThreadPoolExecutor類中,有一個從提交任務(wù)到線程池分配線程執(zhí)行任務(wù),有一個隊列,而這個隊列采用的就是BlockingQueue。BlockingQueue實際上定義了一個接口,在java.util.concurrent包中給出了這個接口的一些實現(xiàn),這篇我們整理一下。 0. BlockingQueue簡介 BlockingQueue是java.util.concurrent包中的接口,擴展了java.util中的的Queue接口。 在Java7的API中,這個接口有11個public方法。 但對于BlockingQueue來說,其本身就是一個就是一個阻塞隊列,所以這些操作的方法中,最重要的兩個就是put()和take()方法,這也是本篇中重點分析的地方,其它的方法可以參見JavaDoc文檔。 BlockingQueue的實現(xiàn)有一個特點,隊列元素不接受null值。 BlockingQueue這個接口在JDK中提供了很多具體實現(xiàn),包括了數(shù)組、鏈表等實現(xiàn),下面就對這些實現(xiàn)類簡要分析下。 1. 數(shù)組實現(xiàn)的ArrayBlockingQueue 看下ArrayBlockingQueue的構(gòu)造方法,一共有三個:
我們發(fā)現(xiàn),構(gòu)造方法中并沒有無參的,這意味著隊列的容量是沒有默認的,在使用的時候需要給出容量值。 后兩個構(gòu)造方法還有fair這個參數(shù)。這個fair可謂是似曾相識,其實它就是ReentrantLock對象初始化要用到的那個參數(shù)。我們知道ArrayBlockingQueue既然是阻塞隊列,那么一定會有阻塞和喚醒,這里的實現(xiàn)用到的是Condition的await()和signal() / signalAll(),而用Condition的前提就是有對應(yīng)的Lock對象,在ArrayBlockingQueue實現(xiàn)中,take()和put()用的是統(tǒng)一的一個單鎖。在ArrayBlockingQueue的某些并發(fā)操作方法中,是需要加鎖來保證線程安全的,而這就是fair參數(shù)的作用。 對于隊列“空”和“滿”的情況,分別使用了兩個Condition對象來維護。 另外,ArrayBlockingQueue類我們直接理解就是數(shù)組實現(xiàn)的阻塞隊列。沒錯,其中的數(shù)據(jù)元素是用Object[]來保存的。對于take()和put()方法,則是分別使用了takeIndex和putIndex這兩個索引值來記錄存放數(shù)據(jù)的位置。
如上,是take()方法實現(xiàn)的源碼。邏輯很簡單,先加鎖,然后判斷是否隊列已空,如條件為真,則阻塞,然后取出隊列中的元素。我們看到,阻塞是通過對notEmpty這個Condition對象的await()方法調(diào)用來做到的,與此對應(yīng),extract()方法中實際上也有一個notFull.signal()的調(diào)用。 2. 單向鏈表實現(xiàn)的LinkedBlockingQueue LinkedBlockingQueue是JDK中BlockingQueue的有一個主要的實現(xiàn)。按照JavaDoc上所述,LinkedBlockingQueue是一個容量可選的阻塞隊列。存在LinkedBlockingQueue()無參的默認構(gòu)造方法實現(xiàn),使用Integer.MAX_VALUE作為默認容量。 在LinkedBlockingQueue類的實現(xiàn)中,很重要的一個和ArrayBlockingQueue不同的地方,是對put()和take()分別使用了兩個不同的鎖,都使用了ReentrantLock實現(xiàn)。而針對“空”和“滿”的阻塞條件,也是對這兩個所對象分別構(gòu)建的兩個Condition對象(notEmpty和notFull),構(gòu)成了雙鎖雙條件。此外,LinkedBlockingQueue也為take和put操作分別維護了索引takeIndex和putIndex。兩鎖或者說隊列狀態(tài)的協(xié)調(diào)一致其實也是通過兩個條件對象的await()和signal()來達成。
此外,對于隊列中元素的計數(shù),LinkedBlockingQueue也和ArrayBlockingQueue的實現(xiàn)略有不同,使用了AtomicInteger類對象。 對于put()和take()以及類似的操作,雙鎖避免了互相影響,一定意義上看,減小了操作的鎖粒度,提高了并發(fā)性。 但對于其他操作,為了保證線程安全,都是雙鎖同時鎖定。雙鎖使用要避免死鎖問題,這個類實現(xiàn)中是統(tǒng)一定義了fullyLock()和fullyUnlock()的方法,先鎖定的后釋放,避免死鎖發(fā)生的可能。 除了用數(shù)組和隊列不同數(shù)據(jù)結(jié)構(gòu)對BlockingQueue接口的基本實現(xiàn)外,還有其他幾種有特殊功能的實現(xiàn)。 3. DelayQueue 基本特征是容量無界,實現(xiàn)上單鎖單條件。 功能特點上,實際上是對優(yōu)先級隊列PriorityQueue類的一個封裝。放入隊列的元素要滿足要求<E extends Delayed>。比較器是時間,因為:
元素需要給出getDelay()方法(實際上是Delayed接口的要求)。 等待第一個元素的線程被設(shè)置為leader,后續(xù)線程無限期等待,直到leader通知他們。隊列中的數(shù)據(jù)元素超時后,元素可被返回。 4. 同步隊列SynchronousQueue 這個類在Executors中使用ThreadPoolExecutor類構(gòu)造CachedThreadPool的時候被用到了。SynchronousQueue的特點是,讀取操作take()和放入操作put()同時完成才會同事解開阻塞。即一個元素只有當(dāng)其本身被take()的時候put()才會被喚醒。沒有容量的概念。 構(gòu)造方法中可以帶fair參數(shù),分為公平和非公平實現(xiàn),具體的實現(xiàn)分別為隊列和棧,順序不同。具體的代碼實現(xiàn)依賴于內(nèi)部類TransferQueue和TransferStack,邏輯較為復(fù)雜,這里不做細節(jié)分析。實現(xiàn)中的阻塞機制直接使用LockSupport的park()方法。 5. 順便說說Exchanger類 這個類也是java.util.concurrent包中的,但和BlockingQueue并無直接層次結(jié)構(gòu)關(guān)系。這里提到它主要是因為從用法上來看,相當(dāng)于一個二項的SynchronousQueue。 具體實現(xiàn)上比較復(fù)雜,不做詳細分析,記錄下幾點:
6. TransferQueue 最后說下TransferQueue這個接口,這個類是java.util.concurrent包中在Java7中增加的,可以看到注釋中的“@since 1.7”。和前面的不同,TransferQueue只是一個接口,不是一個實現(xiàn)。在JDK1.7中,有LinkedTransferQueue這樣一個實現(xiàn)類。需要注意區(qū)分,這個TransferQueue和SynchronousQueue的內(nèi)部實現(xiàn)類TransferQueue不是同一個類。 這個接口/類實際上是一個比SynchronousQueue更靈活更高級的同步隊列,放入新元素可以阻塞也可以非阻塞,并且也可以設(shè)定隊列的元素容量。 這篇對BlockingQueue的小結(jié)就到這里。 相關(guān)文章:
|
|