計數(shù)器:CountDownLatch CountDownLatch類似于一個計數(shù)器,和Atomic類比較相近,操作是原子的,即多個線程同時只能有一個可以去操作。CountDownLatch對象設置一個初始的數(shù)字作為計數(shù)值,任何調(diào)用這個對象上的await()方法都會阻塞,直到這個計數(shù)器的計數(shù)值被其他的線程調(diào)用countDown()減為0為止。典型的應用場景就是:有一個任務想要往下執(zhí)行,但必須要等到其他的任務執(zhí)行完畢后才可以繼續(xù)往下執(zhí)行。例如在Zookeeper的使用過程中,由于客戶端與服務器建立連接是異步調(diào)用的,因此主線程需要await()阻塞直至異步回調(diào)countDown()完成。
代碼示例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch countDownLatch = new CountDownLatch( 2 );
Thread work1 = new Thread( new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + " doing work...start" );
try {
Thread.sleep( 200 );
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " doing work...end " );
countDownLatch.countDown();
}
}, "work1" );
Thread work2 = new Thread( new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " doing work...start" );
try {
Thread.sleep( 200 );
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " doing work...end " );
countDownLatch.countDown();
}
}, "work2" );
work1.start();
work2.start();
try {
countDownLatch.await();
System.out.println( "all workers finish " );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|
齊步走:CyclicBarrier
Barrier的意思是柵欄,就是讓一組線程相互等待,直至所有線程都到齊了,那么就可以齊步走。Cyclic是循環(huán)的意思,就是說Barrier可以循環(huán)使用。CyclicBarrier主要的方法就是await(),較CountDownLatch的await()雖然都是阻塞,但是CyclicBarrier.await()有返回值int,即當前線程是第幾個到達這個Barrier的線程。 構(gòu)造CyclicBarrier時指定計數(shù)值,await() 方法每被調(diào)用一次,計數(shù)便會減少1,并阻塞住當前線程。當計數(shù)減至0時,阻塞解除,所有在此 CyclicBarrier 上面阻塞的線程開始運行。在這之后,如果再次調(diào)用 await() 方法,計數(shù)就又會變成 N-1,新一輪重新開始。在構(gòu)造方法上還可以傳遞一個Runnable對象,阻塞解除時這個Runnable會得到運行。 CyclicBarrier有點“不見不散”的味道,想一想,如果某個成員因某種原因來不了Barrier這個地方,那么我們一直等待嗎?實際中,如果來不了理應通知其他成員,別等了,回家吧!注意到CyclicBarrier.await()獨有的BrokenBarrierException異常
代碼示例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier cyclicBarrier = new CyclicBarrier( 2 , new Runnable() {
@Override
public void run() {
System.out.println( "都準備好啦!" );
}
});
Thread runman1 = new Thread( new Runnable() {
@Override
public void run() {
try {
Thread.sleep( 200 );
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "i am ok" );
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}, "runman1" );
Thread runman2 = new Thread( new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "i am ok" );
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}, "runman2" );
runman1.start();
runman2.start();
}
}
|
Callable And Future
在博主以前的博客《Java Future模式實現(xiàn)》中有介紹Future模式,Future模式非常適合在處理耗時很長的業(yè)務邏輯,可以有效的減少系統(tǒng)的響應時間,提高系統(tǒng)的吞吐量。JDK其實已經(jīng)為我們提供了API實現(xiàn),我們來看一段代碼即可: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public class FutureTest {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<String>( new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep( 2000 );
return "ok" ;
}
});
ExecutorService es = Executors.newFixedThreadPool( 1 );
es.submit(futureTask);
System.out.println( "開啟線程去異步處理,主線程繼續(xù)往下執(zhí)行!" );
try {
System.out.println( "取得異步處理結(jié)果:" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
|
注意到線程池執(zhí)行任務,可以利用2個方法:
submit和execute有什么區(qū)別呢?從入?yún)⒑徒Y(jié)果類型就知道了。
信號量:Semaphore Semaphore實現(xiàn)的功能就類似廁所有5個坑,假如有10個人要上廁所,那么顯然同時只能有5個人占用廁所,當5個人中的任何一個人讓開后,其中等待的另外5個人中又有一個人可以占用了。另外等待的5個人中可以是隨機獲得優(yōu)先機會,也可以是按照先來后到的順序獲得機會,這取決于構(gòu)造Semaphore對象時傳入的fair參數(shù)選項。 Semaphore可以控制某個資源可被同時訪問的個數(shù)(構(gòu)造方法傳入),通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
代碼示例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public static void main(String[] args) {
final Semaphore semaphore = new Semaphore( 5 );
for ( int i = 0 ; i < 6 ; i++){
new Thread( new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 運行..." );
Thread.sleep( 1000 );
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 結(jié)束..." );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},String.valueOf(i)).start();
}
}
|
Condition
JDK由原始的synchronized發(fā)展到Lock,以類的方式提供鎖機制,發(fā)展出重入鎖、讀寫鎖,以類的形式存在自然功能更加強大靈活,比如可以tryLock進行鎖的嗅探。在synchronized代碼塊中我們可以使用wait/notify/notifyAll來進行線程的協(xié)同工作,那么JDK也發(fā)展了這一塊,即Condition。Condition.await類似于wait,Condition.signal/signalAll類似于notify/nofityAll。下面我們簡單實現(xiàn)一個Condition版的生產(chǎn)者/消費者。
處理核心:Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | public class Handler {
//容器
private LinkedList<String> linkedList = new LinkedList<String>();
//限制
private int MAX_SIZE = 3 ;
//鎖
private Lock lock = new ReentrantLock();
//condition 實際上,可以new多個condition,這里暫且只是用給一個
private Condition condition = lock.newCondition();
public void put(String bread){
try {
lock.lock();
if (linkedList.size() == MAX_SIZE){
System.out.println( "容器已滿" );
condition.await();
}
linkedList.add(bread);
System.out.println( "放入面包" + bread);
condition.signalAll();
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void eat(){
try {
lock.lock();
if (linkedList.size() == 0 ){
System.out.println( "容器為空" );
condition.await();
}
String bread = linkedList.removeFirst();
System.out.println( "吃掉一個面包" + bread);
condition.signalAll();
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
|
生產(chǎn)者:Produce 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public class Produce implements Runnable{
private Handler handler;
public Produce(Handler handler) {
this .handler = handler;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++){
try {
Thread.sleep( new Random().nextInt( 1000 ));
} catch (InterruptedException e) {
e.printStackTrace();
}
handler.put(String.valueOf(i));
}
}
}
|
消費者:Consume 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public class Consume implements Runnable{
private Handler handler;
public Consume(Handler handler) {
this .handler = handler;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++){
try {
Thread.sleep( new Random().nextInt( 1000 ));
} catch (InterruptedException e) {
e.printStackTrace();
}
handler.eat();
}
}
}
|
Main: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public class Main {
public static void main(String[] args) {
Handler handler = new Handler();
Produce produce = new Produce(handler);
Consume consume = new Consume(handler);
new Thread(consume).start();
new Thread(produce).start();
new Thread(produce).start();
}
}
|
本文出自 “學海無涯 心境無限” 博客,請務必保留此出處http://zhangfengzhe.blog.51cto.com/8855103/1883655
|