Java庫本身就有多種線程安全的容器和同步工具,其中同步容器包括兩部分:一個(gè)是Vector和Hashtable。另外還有JDK1.2中加入的同步包裝類,這些類都是由Collections.synchronizedXXX工廠方法。同步容器都是線程安全的,但是對于復(fù)合操作,缺有些缺點(diǎn): ① 迭代:在查覺到容器在迭代開始以后被修改,會(huì)拋出一個(gè)未檢查異常ConcurrentModificationException,為了避免這個(gè)異常,需要在迭代期間,持有一個(gè)容器鎖。但是鎖的缺點(diǎn)也很明顯,就是對性能的影響。 ② 隱藏迭代器:StringBuilder的toString方法會(huì)通過迭代容器中的每個(gè)元素,另外容器的hashCode和equals方法也會(huì)間接地調(diào)用迭代。類似地,contailAll、removeAll、retainAll方法,以及容器作為參數(shù)的構(gòu)造函數(shù),都會(huì)對容器進(jìn)行迭代。 ③ 缺少即加入等一些復(fù)合操作 public static Object getLast(Vector list) { int lastIndex = list.size() - 1; return list.get(lastIndex); } public static void deleteLast(Vector list) { int lastIndex = list.size() - 1; list.remove(lastIndex); } getLast和deleteLast都是復(fù)合操作,由先前對原子性的分析可以判斷,這依然存在線程安全問題,有可能會(huì)拋出ArrayIndexOutOfBoundsException的異常,錯(cuò)誤產(chǎn)生的邏輯如下所示: 解決辦法就是通過對這些復(fù)合操作加鎖 1 并發(fā)容器類
正是由于同步容器類有以上問題,導(dǎo)致這些類成了雞肋,于是Java 5推出了并發(fā)容器類,Map對應(yīng)的有ConcurrentHashMap,List對應(yīng)的有CopyOnWriteArrayList。與同步容器類相比,它有以下特性: 1.1 ConcurrentHashMap
· 更加細(xì)化的鎖機(jī)制。同步容器直接把容器對象做為鎖,這樣就把所有操作串行化,其實(shí)這是沒必要的,過于悲觀,而并發(fā)容器采用更細(xì)粒度的鎖機(jī)制,名叫分離鎖,保證一些不會(huì)發(fā)生并發(fā)問題的操作進(jìn)行并行執(zhí)行 · 附加了一些原子性的復(fù)合操作。比如putIfAbsent方法 · 迭代器的弱一致性,而非“及時(shí)失敗”。它在迭代過程中不再拋出Concurrentmodificationexception異常,而是弱一致性。 · 在并發(fā)高的情況下,有可能size和isEmpty方法不準(zhǔn)確,但真正在并發(fā)環(huán)境下這些方法也沒什么作用。 · 另外,它還有一些附加的原子操作,缺少即加入、相等便移除、相等便替換。 putIfAbsent(K key, V value),缺少即加入(如果該鍵已經(jīng)存在,則不加入) 類似于下面的操作 If(!map.containsKey(key)){ return map.put(key,value); }else{ return map.get(key); } remove(Object key, Object value),相等便移除 類似于下面的: if(map.containsKey(key) && map.get(key).equals(value)){ Map.remove(); return true; }else{ return false; } replace(K key, V oldValue, V newValue),相等便替換。 上面提到的三個(gè),都是原子的。在一些緩存應(yīng)用中可以考慮代替HashMap/Hashtable。 1.2 CopyOnWriteArrayList和CopyOnWriteArraySet
· CopyOnWriteArrayList采用寫入時(shí)復(fù)制的方式避開并發(fā)問題。這其實(shí)是通過冗余和不可變性來解決并發(fā)問題,在性能上會(huì)有比較大的代價(jià),但如果寫入的操作遠(yuǎn)遠(yuǎn)小于迭代和讀操作,那么性能就差別不大了。 應(yīng)用它們的場合通常在數(shù)組相對較小,并且遍歷操作的數(shù)量大大超過可變操作的數(shù)量時(shí),這種場合應(yīng)用它們非常好。它們所有可變的操作都是先取得后臺數(shù)組的副本,對副本進(jìn)行更改,然后替換副本,這樣可以保證永遠(yuǎn)不會(huì)拋出ConcurrentModificationException移除。 2 隊(duì)列
Java中的隊(duì)列接口就是Queue,它有會(huì)拋出異常的add、remove方法,在隊(duì)尾插入元素以及對頭移除元素,還有不會(huì)拋出異常的,對應(yīng)的offer、poll方法。 2.1 LinkedList
List實(shí)現(xiàn)了deque接口以及List接口,可以將它看做是這兩種的任何一種。 Queue queue=new LinkedList(); queue.offer("testone"); queue.offer("testtwo"); queue.offer("testthree"); queue.offer("testfour"); System.out.println(queue.poll()); //testone 2.2 PriorityQueue
一個(gè)基于優(yōu)先級堆(簡單的使用鏈表的話,可能插入的效率會(huì)比較低O(N))的無界優(yōu)先級隊(duì)列。優(yōu)先級隊(duì)列的元素按照其自然順序進(jìn)行排序,或者根據(jù)構(gòu)造隊(duì)列時(shí)提供的 Comparator 進(jìn)行排序,具體取決于所使用的構(gòu)造方法。優(yōu)先級隊(duì)列不允許使用 null 元素。依靠自然順序的優(yōu)先級隊(duì)列還不允許插入不可比較的對象。 queue=new PriorityQueue(); queue.offer("testone"); queue.offer("testtwo"); queue.offer("testthree"); queue.offer("testfour"); System.out.println(queue.poll()); //testfour 2.3 ConcurrentLinkedQueue
基于鏈節(jié)點(diǎn)的,線程安全的隊(duì)列。并發(fā)訪問不需要同步。在隊(duì)列的尾部添加元素,并在頭部刪除他們。所以只要不需要知道隊(duì)列的大小,并發(fā)隊(duì)列就是比較好的選擇。 3 阻塞隊(duì)列
3.1 生產(chǎn)者和消費(fèi)者模式
生產(chǎn)者和消費(fèi)者模式,生產(chǎn)者不需要知道消費(fèi)者的身份或者數(shù)量,甚至根本沒有消費(fèi)者,他們只負(fù)責(zé)把數(shù)據(jù)放入隊(duì)列。類似地,消費(fèi)者也不需要知道生產(chǎn)者是誰,以及是誰給他們安排的工作。 而Java知道大家清楚這個(gè)模式的并發(fā)復(fù)雜性,于是乎提供了阻塞隊(duì)列(BlockingQueue)來滿足這個(gè)模式的需求。阻塞隊(duì)列說起來很簡單,就是當(dāng)隊(duì)滿的時(shí)候?qū)懢€程會(huì)等待,直到隊(duì)列不滿的時(shí)候;當(dāng)隊(duì)空的時(shí)候讀線程會(huì)等待,直到隊(duì)不空的時(shí)候。實(shí)現(xiàn)這種模式的方法很多,其區(qū)別也就在于誰的消耗更低和等待的策略更優(yōu)。以LinkedBlockingQueue的具體實(shí)現(xiàn)為例,它的put源碼如下: public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { try { while (count.get() == capacity) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } insert(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } 撇開其鎖的具體實(shí)現(xiàn),其流程就是我們在操作系統(tǒng)課上學(xué)習(xí)到的標(biāo)準(zhǔn)生產(chǎn)者模式,看來那些枯燥的理論還是有用武之地的。其中,最核心的還是Java的鎖實(shí)現(xiàn),有興趣的朋友可以再進(jìn)一步深究一下。 阻塞隊(duì)列Blocking queue,提供了可阻塞的put和take方法,他們與可定時(shí)的offer和poll方法是等價(jià)。Put方法簡化了處理,如果是有界隊(duì)列,那么當(dāng)隊(duì)列滿的時(shí)候,生成者就會(huì)阻塞,從而改消費(fèi)者更多的追趕速度。 3.2 ArrayBlockingQueue和LinkedBlockingQueue
FIFO的隊(duì)列,與LinkedList(由鏈節(jié)點(diǎn)支持,無界)和ArrayList(由數(shù)組支持,有界)相似(Linked有更好的插入和移除性能,Array有更好的查找性能,考慮到阻塞隊(duì)列的特性,移除頭部,加入尾部,兩個(gè)都區(qū)別不大),但是卻擁有比同步List更好的并發(fā)性能。 另外,LinkedList永遠(yuǎn)不會(huì)等待,因?yàn)樗菬o界的。 BlockingQueue<String> queue=new ArrayBlockingQueue<String>(5); Producer p=new Producer(queue); Consumer c1=new Consumer(queue); Consumer c2=new Consumer(queue); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); /** * 生產(chǎn)者 * @author Administrator * */ class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { for(int i=0;i<100;i++){ queue.put(produce()); } } catch (InterruptedException ex) {} } String produce() { String temp=""+(char)('A'+(int)(Math.random()*26)); System.out.println("produce"+temp); return temp; } } /** * 消費(fèi)者 * @author Administrator * */ class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { for(int i=0;i<100;i++){ consume(queue.take()); } } catch (InterruptedException ex) {} } void consume(Object x) { System.out.println("cousume"+x.toString()); } } 輸出: produceK cousumeK produceV cousumeV produceQ cousumeQ produceI produceD produceI produceG produceA produceE cousumeD 3.3 PriorityBlockingQueue
一個(gè)按優(yōu)先級堆支持的無界優(yōu)先級隊(duì)列隊(duì)列,如果不希望按照FIFO的順序進(jìn)行處理,它非常有用。它可以比較元素本身的自然順序,也可以使用一個(gè)Comparator排序。 3.4 DelayQueue
一個(gè)優(yōu)先級堆支持的,基于時(shí)間的調(diào)度隊(duì)列。加入到隊(duì)列中的元素必須實(shí)現(xiàn)新的Delayed接口(只有一個(gè)方法,Long getDelay(java.util.concurrent.TimeUnit unit)),添加可以理立即返回,但是在延遲時(shí)間過去之前,不能從隊(duì)列中取出元素,如果多個(gè)元素的延遲時(shí)間已到,那么最早失效鏈接/失效時(shí)間最長的元素將第一個(gè)取出。 static class NanoDelay implements Delayed{ long tigger; NanoDelay(long i){ tigger=System.nanoTime()+i; } public boolean equals(Object other){ return ((NanoDelay)other).tigger==tigger; } /** * 返回此對象相關(guān)的剩余延遲時(shí)間,零或負(fù)值指示延遲時(shí)間已經(jīng)用盡 */ public long getDelay(TimeUnit unit) { long n=tigger-System.nanoTime(); return unit.convert(n, TimeUnit.NANOSECONDS); } public long getTriggerTime(){ return tigger; } /** * 相互比較,看誰的實(shí)效時(shí)間最長,誰先出去 */ public int compareTo(Delayed o) { long i=tigger; long j=((NanoDelay)o).tigger; if(i<j){ return -1; } if(i>j) return 1; return 0; } } public static void main(String[] args) throws InterruptedException{ Random random=new Random(); DelayQueue<NanoDelay> queue=new DelayQueue<NanoDelay>(); for(int i=0;i<5;i++){ queue.add(new NanoDelay(random.nextInt(1000))); } long last=0; for(int i=0;i<5;i++){ NanoDelay delay=(NanoDelay)(queue.take()); long tt=delay.getTriggerTime(); System.out.println("Trigger time:"+tt); if(i!=0){ System.out.println("Data: "+(tt-last)); } last=tt; } } 3.5 SynchronousQueue
不是一個(gè)真正的隊(duì)列,因?yàn)樗粫?huì)為隊(duì)列元素維護(hù)任何存儲空間,不過它維護(hù)一個(gè)排隊(duì)的線程清單,這些線程等待把元素加入(enqueue)隊(duì)列或者移出(dequeue)隊(duì)列。也就是說,它非常直接的移交工作,減少了生產(chǎn)者和消費(fèi)者之間移動(dòng)數(shù)據(jù)的延遲時(shí)間,另外,也可以更快的知道反饋信息,當(dāng)移交被接受時(shí),它就知道消費(fèi)者已經(jīng)得到了任務(wù)。 因?yàn)?font face="Times New Roman">SynChronousQueue沒有存儲的能力,所以除非另一個(gè)線程已經(jīng)做好準(zhǔn)備,否則put和take會(huì)一直阻止。它只有在消費(fèi)者比較充足的時(shí)候比較合適。 4 雙端隊(duì)列(Deque)
JAVA6中新增了兩個(gè)容器Deque和BlockingDeque,他們分別擴(kuò)展了Queue和BlockingQueue。Deque它是一個(gè)雙端隊(duì)列,允許高效的在頭和尾分別進(jìn)行插入和刪除,它的實(shí)現(xiàn)分別是ArrayDeque和LinkedBlockingQueue。 雙端隊(duì)列使得他們能夠工作在一種稱為“竊取工作”的模式上面。 5 最佳實(shí)踐
1..同步的(synchronized)+HashMap,如果不存在,則計(jì)算,然后加入,該方法需要同步。 HashMap cache=new HashMap(); public synchronized V compute(A arg){ V result=cace.get(arg); Ii(result==null){ result=c.compute(arg); Cache.put(result); } Return result; } 2.用ConcurrentHashMap代替HashMap+同步.,這樣的在get和set的時(shí)候也基本能保證原子性。但是會(huì)帶來重復(fù)計(jì)算的問題. Map<A,V> cache=new ConcurrentHashMap<A,V>(); public V compute(A arg){ V result=cace.get(arg); Ii(result==null){ result=c.compute(arg); Cache.put(result); } Return result; } 3.采用FutureTask代替直接存儲值,這樣可以在一開始創(chuàng)建的時(shí)候就將Task加入 Map<A,FutureTask<V>> cache=new ConcurrentHashMap<A,FutureTask<V>>(); public V compute(A arg){ FutureTask <T> f=cace.get(arg); //檢查再運(yùn)行的缺陷 Ii(f==null){ Callable<V> evel=new Callable(){ Public V call() throws ..{ return c.compute(arg); } }; FutureTask <T> ft=new FutureTask<T>(evel); f=ft; cache.put(arg,ft; ft.run(); } Try{ //阻塞,直到完成 return f.get(); }cach(){ } } 4.上面還有檢查再運(yùn)行的缺陷,在高并發(fā)的情況下啊,雙方都沒發(fā)現(xiàn)FutureTask,并且都放入Map(后一個(gè)被前一個(gè)替代),都開始了計(jì)算。 這里的解決方案在于,當(dāng)他們都要放入Map的時(shí)候,如果可以有原子方法,那么已經(jīng)有了以后,后一個(gè)FutureTask就加入,并且啟動(dòng)。 public V compute(A arg){ FutureTask <T> f=cace.get(arg); //檢查再運(yùn)行的缺陷 Ii(f==null){ Callable<V> evel=new Callable(){ Public V call() throws ..{ return c.compute(arg); } }; FutureTask <T> ft=new FutureTask<T>(evel); f=cache.putIfAbsent(args,ft); //如果已經(jīng)存在,返回存在的值,否則返回null if(f==null){f=ft;ft.run();} //以前不存在,說明應(yīng)該開始這個(gè)計(jì)算 else{ft=null;} //取消該計(jì)算 } Try{ //阻塞,直到完成 return f.get(); }cach(){ } } 5.上面的程序上來看已經(jīng)完美了,不過可能帶來緩存污染的可能性。如果一個(gè)計(jì)算被取消或者失敗,那么這個(gè)鍵以后的值永遠(yuǎn)都是失敗了;一種解決方案是,發(fā)現(xiàn)取消或者失敗的task,就移除它,如果有Exception,也移除。 6.另外,如果考慮緩存過期的問題,可以為每個(gè)結(jié)果關(guān)聯(lián)一個(gè)過去時(shí)間,并周期性的掃描,清除過期的緩存。(過期時(shí)間可以用Delayed接口實(shí)現(xiàn),參考DelayQueue,給他一個(gè)大于當(dāng)前時(shí)間XXX的時(shí)間,,并且不斷減去當(dāng)前時(shí)間,直到返回負(fù)數(shù),說明延遲時(shí)間已到了。) 7. |
|