一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

java并發(fā)容器(Map、List、BlockingQueue)

 碧海山城 2010-10-27

Java庫本身就有多種線程安全的容器和同步工具,其中同步容器包括兩部分:一個(gè)是VectorHashtable。另外還有JDK1.2中加入的同步包裝類,這些類都是由Collections.synchronizedXXX工廠方法。同步容器都是線程安全的,但是對于復(fù)合操作,缺有些缺點(diǎn):

① 迭代:在查覺到容器在迭代開始以后被修改,會(huì)拋出一個(gè)未檢查異常ConcurrentModificationException,為了避免這個(gè)異常,需要在迭代期間,持有一個(gè)容器鎖。但是鎖的缺點(diǎn)也很明顯,就是對性能的影響。

② 隱藏迭代器:StringBuildertoString方法會(huì)通過迭代容器中的每個(gè)元素,另外容器的hashCodeequals方法也會(huì)間接地調(diào)用迭代。類似地,contailAllremoveAll、retainAll方法,以及容器作為參數(shù)的構(gòu)造函數(shù),都會(huì)對容器進(jìn)行迭代。

③ 缺少即加入等一些復(fù)合操作

public static Object getLast(Vector list) {     

int lastIndex = list.size() - 1;     

return list.get(lastIndex);

}

public static void deleteLast(Vector list) {     

int lastIndex = list.size() - 1;     

list.remove(lastIndex);

}

getLastdeleteLast都是復(fù)合操作,由先前對原子性的分析可以判斷,這依然存在線程安全問題,有可能會(huì)拋出ArrayIndexOutOfBoundsException的異常,錯(cuò)誤產(chǎn)生的邏輯如下所示:


解決辦法就是通過對這些復(fù)合操作加鎖

并發(fā)容器類

正是由于同步容器類有以上問題,導(dǎo)致這些類成了雞肋,于是Java 5推出了并發(fā)容器類,Map對應(yīng)的有ConcurrentHashMap,List對應(yīng)的有CopyOnWriteArrayList。與同步容器類相比,它有以下特性:

1.1 ConcurrentHashMap

· 更加細(xì)化的鎖機(jī)制。同步容器直接把容器對象做為鎖,這樣就把所有操作串行化,其實(shí)這是沒必要的,過于悲觀,而并發(fā)容器采用更細(xì)粒度的鎖機(jī)制,名叫分離鎖,保證一些不會(huì)發(fā)生并發(fā)問題的操作進(jìn)行并行執(zhí)行

· 附加了一些原子性的復(fù)合操作。比如putIfAbsent方法

· 迭代器的弱一致性,而非“及時(shí)失敗”。它在迭代過程中不再拋出Concurrentmodificationexception異常,而是弱一致性。

· 在并發(fā)高的情況下,有可能sizeisEmpty方法不準(zhǔn)確,但真正在并發(fā)環(huán)境下這些方法也沒什么作用。

· 另外,它還有一些附加的原子操作,缺少即加入、相等便移除、相等便替換。

putIfAbsent(K key, V value),缺少即加入(如果該鍵已經(jīng)存在,則不加入)
          如果指定鍵已經(jīng)不再與某個(gè)值相關(guān)聯(lián),則將它與給定值關(guān)聯(lián)。

類似于下面的操作

If(!map.containsKey(key)){

return map.put(key,value);

}else{

return map.get(key);

}

remove(Object key, Object value),相等便移除
          只有目前將鍵的條目映射到給定值時(shí),才移除該鍵的條目。

類似于下面的:

if(map.containsKey(key) && map.get(key).equals(value)){

Map.remove();

return true;

}else{

return false;

}

replace(K key, V value)

replace(K key, V oldValue, V newValue),相等便替換。
          只有目前將鍵的條目映射到某一值時(shí),才替換該鍵的條目。

上面提到的三個(gè),都是原子的。在一些緩存應(yīng)用中可以考慮代替HashMap/Hashtable。

1.2 CopyOnWriteArrayListCopyOnWriteArraySet

· CopyOnWriteArrayList采用寫入時(shí)復(fù)制的方式避開并發(fā)問題。這其實(shí)是通過冗余和不可變性來解決并發(fā)問題,在性能上會(huì)有比較大的代價(jià),但如果寫入的操作遠(yuǎn)遠(yuǎn)小于迭代和讀操作,那么性能就差別不大了

應(yīng)用它們的場合通常在數(shù)組相對較小,并且遍歷操作的數(shù)量大大超過可變操作的數(shù)量時(shí),這種場合應(yīng)用它們非常好。它們所有可變的操作都是先取得后臺數(shù)組的副本,對副本進(jìn)行更改,然后替換副本,這樣可以保證永遠(yuǎn)不會(huì)拋出ConcurrentModificationException移除。

隊(duì)列

Java中的隊(duì)列接口就是Queue,它有會(huì)拋出異常的add、remove方法,在隊(duì)尾插入元素以及對頭移除元素,還有不會(huì)拋出異常的,對應(yīng)的offer、poll方法。

2.1 LinkedList

List實(shí)現(xiàn)了deque接口以及List接口,可以將它看做是這兩種的任何一種。

Queue queue=new LinkedList();

queue.offer("testone");

queue.offer("testtwo");

queue.offer("testthree");

queue.offer("testfour");

System.out.println(queue.poll()); //testone

2.2 PriorityQueue

一個(gè)基于優(yōu)先級堆(簡單的使用鏈表的話,可能插入的效率會(huì)比較低O(N)的無界優(yōu)先級隊(duì)列。優(yōu)先級隊(duì)列的元素按照其自然順序進(jìn)行排序,或者根據(jù)構(gòu)造隊(duì)列時(shí)提供的 Comparator 進(jìn)行排序,具體取決于所使用的構(gòu)造方法。優(yōu)先級隊(duì)列不允許使用 null 元素。依靠自然順序的優(yōu)先級隊(duì)列還不允許插入不可比較的對象。

queue=new PriorityQueue();

queue.offer("testone");

queue.offer("testtwo");

queue.offer("testthree");

queue.offer("testfour");

System.out.println(queue.poll()); //testfour

2.3 ConcurrentLinkedQueue

基于鏈節(jié)點(diǎn)的,線程安全的隊(duì)列。并發(fā)訪問不需要同步。在隊(duì)列的尾部添加元素,并在頭部刪除他們。所以只要不需要知道隊(duì)列的大小,并發(fā)隊(duì)列就是比較好的選擇。

阻塞隊(duì)列

3.1 生產(chǎn)者和消費(fèi)者模式

生產(chǎn)者和消費(fèi)者模式生產(chǎn)者不需要知道消費(fèi)者的身份或者數(shù)量,甚至根本沒有消費(fèi)者,他們只負(fù)責(zé)把數(shù)據(jù)放入隊(duì)列。類似地,消費(fèi)者也不需要知道生產(chǎn)者是誰,以及是誰給他們安排的工作。

Java知道大家清楚這個(gè)模式的并發(fā)復(fù)雜性,于是乎提供了阻塞隊(duì)列(BlockingQueue)來滿足這個(gè)模式的需求。阻塞隊(duì)列說起來很簡單,就是當(dāng)隊(duì)滿的時(shí)候?qū)懢€程會(huì)等待,直到隊(duì)列不滿的時(shí)候;當(dāng)隊(duì)空的時(shí)候讀線程會(huì)等待,直到隊(duì)不空的時(shí)候。實(shí)現(xiàn)這種模式的方法很多,其區(qū)別也就在于誰的消耗更低和等待的策略更優(yōu)。以LinkedBlockingQueue的具體實(shí)現(xiàn)為例,它的put源碼如下:

public void put(E e) throws InterruptedException {

        if (e == nullthrow new NullPointerException();

        int c = -1;

        final ReentrantLock putLock = this.putLock;

        final AtomicInteger count = this.count;

        putLock.lockInterruptibly();

        try {

            

            try {

                while (count.get() == capacity)

                    notFull.await();

            } catch (InterruptedException ie) {

                notFull.signal(); 

// propagate to a non-interrupted thread

                throw ie;

            }

            insert(e);

            c = count.getAndIncrement();

            if (c + 1 < capacity)

                notFull.signal();

        } finally {

            putLock.unlock();

        }

        if (c == 0)

            signalNotEmpty();

}

撇開其鎖的具體實(shí)現(xiàn),其流程就是我們在操作系統(tǒng)課上學(xué)習(xí)到的標(biāo)準(zhǔn)生產(chǎn)者模式,看來那些枯燥的理論還是有用武之地的。其中,最核心的還是Java的鎖實(shí)現(xiàn),有興趣的朋友可以再進(jìn)一步深究一下。

阻塞隊(duì)列Blocking queue,提供了可阻塞的puttake方法,他們與可定時(shí)的offerpoll方法是等價(jià)Put方法簡化了處理,如果是有界隊(duì)列,那么當(dāng)隊(duì)列滿的時(shí)候,生成者就會(huì)阻塞,從而改消費(fèi)者更多的追趕速度。

 

3.2 ArrayBlockingQueueLinkedBlockingQueue

FIFO的隊(duì)列,與LinkedList(由鏈節(jié)點(diǎn)支持,無界)ArrayList(由數(shù)組支持,有界)相似(Linked有更好的插入和移除性能,Array有更好的查找性能,考慮到阻塞隊(duì)列的特性,移除頭部,加入尾部,兩個(gè)都區(qū)別不大),但是卻擁有比同步List更好的并發(fā)性能。

另外,LinkedList永遠(yuǎn)不會(huì)等待,因?yàn)樗菬o界的。

BlockingQueue<String> queue=new ArrayBlockingQueue<String>(5);

Producer p=new Producer(queue);

Consumer c1=new Consumer(queue);

Consumer c2=new Consumer(queue);

new Thread(p).start();

new Thread(c1).start();

new Thread(c2).start();

/**

 * 生產(chǎn)者

 * @author Administrator

 *

 */

class Producer implements Runnable {

   private final BlockingQueue queue;

   Producer(BlockingQueue q) { queue = q; }

   

   public void run() {

     try {

      for(int i=0;i<100;i++){

      queue.put(produce());

      }

      

     } catch (InterruptedException ex) {}

   }

   

   String produce() {

   String temp=""+(char)('A'+(int)(Math.random()*26));

   System.out.println("produce"+temp);

   return temp;

   }

 }

/**

 * 消費(fèi)者

 * @author Administrator

 *

 */

class Consumer implements Runnable {

   private final BlockingQueue queue;

   Consumer(BlockingQueue q) { queue = q; }

   public void run() {

     try {

      for(int i=0;i<100;i++){

      consume(queue.take());

      }

     } catch (InterruptedException ex) {}

   }

   void consume(Object x) {

   System.out.println("cousume"+x.toString());

   }

 }

輸出:

produceK

cousumeK

produceV

cousumeV

produceQ

cousumeQ

produceI

produceD

produceI

produceG

produceA

produceE

cousumeD

3.3 PriorityBlockingQueue

一個(gè)按優(yōu)先級堆支持的無界優(yōu)先級隊(duì)列隊(duì)列,如果不希望按照FIFO的順序進(jìn)行處理,它非常有用。它可以比較元素本身的自然順序,也可以使用一個(gè)Comparator排序。

3.4 DelayQueue

一個(gè)優(yōu)先級堆支持的,基于時(shí)間的調(diào)度隊(duì)列。加入到隊(duì)列中的元素必須實(shí)現(xiàn)新的Delayed接口(只有一個(gè)方法,Long getDelay(java.util.concurrent.TimeUnit unit)),添加可以理立即返回,但是在延遲時(shí)間過去之前,不能從隊(duì)列中取出元素,如果多個(gè)元素的延遲時(shí)間已到,那么最早失效鏈接/失效時(shí)間最長的元素將第一個(gè)取出。

static class NanoDelay implements Delayed{

long tigger;

NanoDelay(long i){

tigger=System.nanoTime()+i;

}

public boolean equals(Object other){

return ((NanoDelay)other).tigger==tigger;

}

/**

 * 返回此對象相關(guān)的剩余延遲時(shí)間,零或負(fù)值指示延遲時(shí)間已經(jīng)用盡

 */

public long getDelay(TimeUnit unit) {

long n=tigger-System.nanoTime();

return unit.convert(n, TimeUnit.NANOSECONDS);

}

public long getTriggerTime(){

return tigger;

}

/**

 * 相互比較,看誰的實(shí)效時(shí)間最長,誰先出去

 */

public int compareTo(Delayed o) {

long i=tigger;

long j=((NanoDelay)o).tigger;

if(i<j){

return -1;

}

if(i>j)

return 1;

return 0;

}

}

public static void main(String[] args) throws InterruptedException{

Random random=new Random();

DelayQueue<NanoDelay> queue=new DelayQueue<NanoDelay>();

for(int i=0;i<5;i++){

queue.add(new NanoDelay(random.nextInt(1000)));

}

long last=0;

for(int i=0;i<5;i++){

NanoDelay delay=(NanoDelay)(queue.take());

long tt=delay.getTriggerTime();

System.out.println("Trigger time:"+tt);

if(i!=0){

System.out.println("Data: "+(tt-last));

}

last=tt;

}

}

3.5 SynchronousQueue

不是一個(gè)真正的隊(duì)列,因?yàn)樗粫?huì)為隊(duì)列元素維護(hù)任何存儲空間,不過它維護(hù)一個(gè)排隊(duì)的線程清單,這些線程等待把元素加入(enqueue)隊(duì)列或者移出(dequeue)隊(duì)列。也就是說,它非常直接的移交工作,減少了生產(chǎn)者和消費(fèi)者之間移動(dòng)數(shù)據(jù)的延遲時(shí)間,另外,也可以更快的知道反饋信息,當(dāng)移交被接受時(shí),它就知道消費(fèi)者已經(jīng)得到了任務(wù)。

因?yàn)?font face="Times New Roman">SynChronousQueue沒有存儲的能力,所以除非另一個(gè)線程已經(jīng)做好準(zhǔn)備,否則puttake會(huì)一直阻止。它只有在消費(fèi)者比較充足的時(shí)候比較合適。

雙端隊(duì)列Deque

JAVA6中新增了兩個(gè)容器DequeBlockingDeque,他們分別擴(kuò)展了QueueBlockingQueue。Deque它是一個(gè)雙端隊(duì)列,允許高效的在頭和尾分別進(jìn)行插入和刪除,它的實(shí)現(xiàn)分別是ArrayDequeLinkedBlockingQueue。

雙端隊(duì)列使得他們能夠工作在一種稱為“竊取工作”的模式上面。

最佳實(shí)踐

1..同步的(synchronized+HashMap,如果不存在,則計(jì)算,然后加入,該方法需要同步。

HashMap cache=new HashMap();

public synchronized V compute(A arg){

V result=cace.get(arg);

Ii(result==null){

result=c.compute(arg);

Cache.put(result);

}

Return result;

}

2.ConcurrentHashMap代替HashMap+同步.,這樣的在getset的時(shí)候也基本能保證原子性。但是會(huì)帶來重復(fù)計(jì)算的問題.

Map<A,V> cache=new ConcurrentHashMap<A,V>();

public  V compute(A arg){

V result=cace.get(arg);

Ii(result==null){

result=c.compute(arg);

Cache.put(result);

}

Return result;

}

3.采用FutureTask代替直接存儲值,這樣可以在一開始創(chuàng)建的時(shí)候就將Task加入

Map<A,FutureTask<V>> cache=new ConcurrentHashMap<A,FutureTask<V>>();

public  V compute(A arg){

FutureTask <T> f=cace.get(arg);

//檢查再運(yùn)行的缺陷

Ii(f==null){

Callable<V> evel=new Callable(){

Public V call() throws ..{

return c.compute(arg);

}

};

FutureTask <T> ft=new FutureTask<T>(evel);

f=ft;

cache.put(arg,ft;

ft.run();

}

Try{

//阻塞,直到完成

return f.get();

}cach(){

}

}

4.上面還有檢查再運(yùn)行的缺陷,在高并發(fā)的情況下啊,雙方都沒發(fā)現(xiàn)FutureTask,并且都放入Map(后一個(gè)被前一個(gè)替代),都開始了計(jì)算。

這里的解決方案在于,當(dāng)他們都要放入Map的時(shí)候,如果可以有原子方法,那么已經(jīng)有了以后,后一個(gè)FutureTask就加入,并且啟動(dòng)。

public  V compute(A arg){

FutureTask <T> f=cace.get(arg);

//檢查再運(yùn)行的缺陷

Ii(f==null){

Callable<V> evel=new Callable(){

Public V call() throws ..{

return c.compute(arg);

}

};

FutureTask <T> ft=new FutureTask<T>(evel);

f=cache.putIfAbsent(args,ft); //如果已經(jīng)存在,返回存在的值,否則返回null

if(f==null){f=ft;ft.run();} //以前不存在,說明應(yīng)該開始這個(gè)計(jì)算

else{ft=null;} //取消該計(jì)算

}

Try{

//阻塞,直到完成

return f.get();

}cach(){

}

}

5.上面的程序上來看已經(jīng)完美了,不過可能帶來緩存污染的可能性。如果一個(gè)計(jì)算被取消或者失敗,那么這個(gè)鍵以后的值永遠(yuǎn)都是失敗了;一種解決方案是,發(fā)現(xiàn)取消或者失敗的task,就移除它,如果有Exception,也移除。

6.另外,如果考慮緩存過期的問題,可以為每個(gè)結(jié)果關(guān)聯(lián)一個(gè)過去時(shí)間,并周期性的掃描,清除過期的緩存。(過期時(shí)間可以用Delayed接口實(shí)現(xiàn),參考DelayQueue,給他一個(gè)大于當(dāng)前時(shí)間XXX的時(shí)間,,并且不斷減去當(dāng)前時(shí)間,直到返回負(fù)數(shù),說明延遲時(shí)間已到了。

7.

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    九九热国产这里只有精品 | 国产亚洲精品俞拍视频福利区| 麻豆视传媒短视频免费观看 | 国产一区二区三区av在线| 超碰在线免费公开中国黄片| 日本黄色录像韩国黄色录像| 亚洲日本加勒比在线播放| 中文字幕禁断介一区二区| 亚洲欧美日本国产有色| 午夜精品黄片在线播放| 一级欧美一级欧美在线播| 草草视频精品在线观看| 一区二区免费视频中文乱码国产| 国产精品欧美一区二区三区不卡| 国产av一区二区三区久久不卡| 亚洲第一香蕉视频在线| 色涩一区二区三区四区| 久久三级国外久久久三级| 日本精品中文字幕人妻| 中文字幕乱子论一区二区三区| 美国黑人一级黄色大片| 日韩免费成人福利在线| 日韩亚洲激情在线观看| 精品少妇人妻av免费看| 国产午夜精品在线免费看| 国产日韩精品激情在线观看| 在线观看视频日韩成人| 亚洲精品中文字幕熟女| 国产专区亚洲专区久久| 久久少妇诱惑免费视频| 亚洲中文字幕在线视频频道| 国产成人精品一区在线观看| 国产精品免费福利在线| 日韩1区二区三区麻豆| 不卡中文字幕在线免费看| 午夜福利国产精品不卡| 丰满少妇被猛烈撞击在线视频| 在线观看欧美视频一区| 最近日韩在线免费黄片| 午夜午夜精品一区二区| 99热在线精品视频观看|