一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

ddsdcx

 昵稱9574 2006-07-04

13: 并發(fā)編程

面向?qū)ο笫刮覀兡軐⒊绦騽澐殖上嗷オ?dú)立的模塊。但是你時(shí)常還會(huì)碰到,不但要把程序分解開來(lái),而且還要讓它的各個(gè)部分都能獨(dú)立運(yùn)行的問(wèn)題。

這種能獨(dú)立運(yùn)行的子任務(wù)就是線程(thread)。編程的時(shí)候,你可以認(rèn)為線程都是能獨(dú)立運(yùn)行的,有自己CPU的子任務(wù)。實(shí)際上,是一些底層機(jī)制在為你分割CPU的時(shí)間,只是你不知道罷了。這種做法能簡(jiǎn)化多線程的編程。

進(jìn)程(process)是一種有專屬地址空間的"自含式(self-contained)"程序。通過(guò)在不同的任務(wù)之間定時(shí)切換CPU,多任務(wù)(multitasking)操作系統(tǒng)營(yíng)造出一種同一個(gè)時(shí)點(diǎn)可以有多個(gè)進(jìn)程(程序)在同時(shí)運(yùn)行的效果。線程是進(jìn)程內(nèi)部的獨(dú)立的,有序的指令流。由此,一個(gè)進(jìn)程能包含多個(gè)并發(fā)執(zhí)行的線程。

多線程的用途很廣,但歸納起來(lái)不外乎,程序的某一部分正在等一個(gè)事件或資源,而你又不想讓它把整個(gè)程序都給阻塞了。因此你可以創(chuàng)建一個(gè)與該事件或資源相關(guān)的線程,讓它與主程序分開來(lái)運(yùn)行。

學(xué)習(xí)并發(fā)編程就像是去探訪一個(gè)新的世界,同時(shí)學(xué)習(xí)一種新的編程語(yǔ)言,最起碼也得接受一套新的理念。隨著絕大多數(shù)的微電腦操作系統(tǒng)提供了多線程支持,編程語(yǔ)言和類庫(kù)也做了相應(yīng)的擴(kuò)展。總而言之,多線程編程:

  1. 看上去不但神秘,而且還要求你改變編程的觀念
  2. 各種語(yǔ)言對(duì)多線程的支持大同小異,所以理解線程就等于掌握了一種通用語(yǔ)言

雖然支持多線程會(huì)讓Java變得更復(fù)雜,但這并不全是Java的錯(cuò)——不是Java無(wú)能,而是線程太狡猾了。

理解并發(fā)編程的難度不亞于理解多態(tài)性。如果只想了解其最基本的運(yùn)行機(jī)制,那么只要花一點(diǎn)工夫就行了,但是要想真正做到融會(huì)貫通,那就非得下苦功了。本章的宗旨是,讓你能牢固地掌握并發(fā)編程方面的基礎(chǔ)知識(shí),寫出較為合理的多線程程序。要提醒你,多線程看著容易其實(shí)很難,因此你很可能會(huì)過(guò)于自信了。所以如果你要寫一些比較復(fù)雜的東西,務(wù)必先去研究一下這方面的專著。

動(dòng)機(jī)

并發(fā)編程的一個(gè)最主要的用途就是創(chuàng)建反應(yīng)靈敏的用戶界面。試想有這么一個(gè)程序,由于要進(jìn)行大量的CPU密集的運(yùn)算,它完全忽略了用戶輸入,以致于變得非常遲鈍了。要解決這種問(wèn)題,關(guān)鍵在于,程序在進(jìn)行運(yùn)算的同時(shí),還要時(shí)不時(shí)地將控制權(quán)交還給用戶界面,這樣才能對(duì)用戶的操作做出及時(shí)的響應(yīng)。假設(shè)有一個(gè)"quit"按鈕,你總不會(huì)希望每寫一段代碼就做一次輪詢的吧,你要的是"quit"能及時(shí)響應(yīng)用戶的操作,就像你在定時(shí)檢查一樣。

常規(guī)的方法是不可能在運(yùn)行指令的同時(shí)還把控制權(quán)交給其他程序的。這聽上去簡(jiǎn)直就是在天方夜譚,就好像CPU能同時(shí)出現(xiàn)在兩個(gè)地方,但是多線程所營(yíng)造的正是這個(gè)效果。

并發(fā)編程還能用來(lái)優(yōu)化吞吐率。比方說(shuō),你可以一邊在等數(shù)據(jù),一邊做其他事情。要是沒(méi)有多線程,唯一可行的辦法就是作輪詢,但是這種做法不但笨拙而且很難。

如果是多處理器的系統(tǒng),線程還會(huì)被分到多個(gè)處理器上,這樣就能大大提高(指令的)吞吐率了。通常多處理器的Web服務(wù)器就是這樣做的,它會(huì)為每個(gè)請(qǐng)求分配一個(gè)線程,這樣就把多個(gè)請(qǐng)求分?jǐn)偨o各個(gè)CPU了。

有一點(diǎn)要記住,那就是多線程程序也必須能運(yùn)行在單CPU系統(tǒng)上。因此,即便不用多線程也應(yīng)該能寫出相同的程序。但是多線程提供了一種非常重要的,程序結(jié)構(gòu)方面的優(yōu)勢(shì),因而能大大簡(jiǎn)化軟件的設(shè)計(jì)。對(duì)于像仿真模擬之類的視頻游戲,如果不用多線程,真不知道該怎么解決。

多線程模型大大簡(jiǎn)化了"讓一個(gè)程序同時(shí)做幾件事"這個(gè)難題。在多線程環(huán)境下,CPU會(huì)不斷地在線程之間進(jìn)行切換,給它們分配時(shí)間。在線程看來(lái),它能隨時(shí)獲得CPU,但實(shí)際上CPU的時(shí)間已經(jīng)被分配給了所有線程。這里有個(gè)例外,如果程序是運(yùn)行在多CPU的系統(tǒng)上,那么線程就真有可能獨(dú)占整個(gè)CPU了。但是多線程最值得稱道的還是它的底層抽象,即代碼無(wú)需知道它是運(yùn)行在單CPU還是多CPU的系統(tǒng)上。由此,你可以用多線程來(lái)創(chuàng)建可透明擴(kuò)展的程序——如果程序運(yùn)行得太慢了,直接在機(jī)器上加CPU就行了。多任務(wù)與多線程是充分利用多處理器系統(tǒng)的好辦法。

在單CPU系統(tǒng)里,線程會(huì)多少影響程序的運(yùn)算效率,但是通算下來(lái),它在程序結(jié)構(gòu),平衡資源以及用戶的舒適度方面的優(yōu)勢(shì)還是遠(yuǎn)遠(yuǎn)超過(guò)了性能方面的損失??傊?,多線程能令你設(shè)計(jì)出更為松散耦合(more loosely-coupled)的應(yīng)用程序;否則,你就得多寫很多代碼,把那些本應(yīng)交由多線程去處理的事情攬到自己頭上了。

基本線程

要想創(chuàng)建線程,最簡(jiǎn)單的辦法就是繼承java.lang.Thread。這個(gè)類已經(jīng)為線程的創(chuàng)建和運(yùn)行做了必要的配置。run( )Thread最重要的方法,要想讓線程替你辦事,你就必須覆寫這個(gè)方法。由此可知,run( )所包含的就是要和程序里其它線程"同時(shí)"執(zhí)行的代碼。

下面這段程序會(huì)創(chuàng)建五個(gè)線程,每個(gè)線程都包含一個(gè)由static變量生成的唯一標(biāo)識(shí)符。我們覆寫了Threadrun( )方法,讓它每做一次循環(huán)就減一,并且在數(shù)到零的時(shí)候返回(什么時(shí)候run( )返回了,線程也就中止了)。

//: c13:SimpleThread.java
// Very simple Threading example.
import com.bruceeckel.simpletest.*;
public class SimpleThread extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private static int threadCount = 0;
public SimpleThread() {
super("" + ++threadCount); // Store the thread name
start();
}
public String toString() {
return "#" + getName() + ": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new SimpleThread();
monitor.expect(new String[] {
"#1: 5",
"#2: 5",
"#3: 5",
"#5: 5",
"#1: 4",
"#4: 5",
"#2: 4",
"#3: 4",
"#5: 4",
"#1: 3",
"#4: 4",
"#2: 3",
"#3: 3",
"#5: 3",
"#1: 2",
"#4: 3",
"#2: 2",
"#3: 2",
"#5: 2",
"#1: 1",
"#4: 2",
"#2: 1",
"#3: 1",
"#5: 1",
"#4: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

我們用Thread的構(gòu)造函數(shù)給線程起名字。接著又在toString( )里面用getName( )把這個(gè)名字提取出來(lái)。

實(shí)際上Threadrun( )里面總是會(huì)有循環(huán),這些循環(huán)會(huì)一直運(yùn)行下去,直到線程結(jié)束。所以你必須設(shè)定條件打破循環(huán)(或者像上面那樣,直接在run( )里面return)。通常run( )是個(gè)無(wú)限循環(huán),也就是說(shuō),刨開那些會(huì)讓run( )停下來(lái)的意外情況,線程會(huì)一直運(yùn)行下去(本章的后面會(huì)講怎樣安全地通知線程停下來(lái))。

main( )創(chuàng)建并啟動(dòng)了多個(gè)線程。Threadstart( )方法會(huì)先對(duì)線程做一些初始化,再調(diào)用run( )。所以整個(gè)步驟應(yīng)該是:調(diào)用構(gòu)造函數(shù)創(chuàng)建一個(gè)對(duì)象,并且在構(gòu)造函數(shù)里面調(diào)用start( )來(lái)配置這個(gè)線程,然后讓線程的執(zhí)行機(jī)制去調(diào)用run( )。如果你不調(diào)用start( )(你也可以這么做,后面會(huì)舉例的),那么線程永遠(yuǎn)也不會(huì)啟動(dòng)。

線程的調(diào)度機(jī)制是非決定性,因此程序每次運(yùn)行的時(shí)候,輸出都不一樣。實(shí)際上,這么簡(jiǎn)單的一個(gè)程序,在不同版本的JDK里,輸出也是天差地別。比方說(shuō)早期的JDK不能把時(shí)間片分得很細(xì),因此線程1會(huì)第一個(gè)結(jié)束,接著是線程2,線程3,等等。實(shí)際上這跟調(diào)用立即進(jìn)入循環(huán)的子程序沒(méi)什么兩樣,只是啟動(dòng)線程的代價(jià)更高了。但是在JDK 1.4里,你會(huì)發(fā)現(xiàn)程序輸出會(huì)比較接近SimpleThread.java所給出的,這表示線程調(diào)度機(jī)制能更好地進(jìn)行時(shí)間分片了——這樣每個(gè)線程就都能經(jīng)常得到服務(wù)了。通常Sun是不會(huì)告訴你這種JDK運(yùn)行方式的變化的,所以別去指望線程的運(yùn)行方式會(huì)始終如一。寫多線程程序的時(shí)候,最好還是保守一些。

main( )創(chuàng)建了Thread,但是卻沒(méi)去拿它的reference。如果是普通對(duì)象,這一點(diǎn)就足以讓它成為垃圾,但Thread不會(huì)。Thread都會(huì)為它自己"注冊(cè)",所以實(shí)際上reference還保留在某個(gè)地方。除非run( )退出,線程中止,否則垃圾回收器不能動(dòng)它。

Yielding

如果你知道run( )已經(jīng)告一段落了,你就可以給線程調(diào)度機(jī)制作一個(gè)暗示,告訴它你干完了,可以讓別的線程來(lái)使用CPU了。這個(gè)暗示(注意,只是暗示——無(wú)法保證你用的這個(gè)JVM會(huì)不會(huì)對(duì)此作出反映)是用yield( )形式給出的。

下面我們就用這個(gè)辦法來(lái)修改程序:

//: c13:YieldingThread.java
// Suggesting when to switch threads with yield().
import com.bruceeckel.simpletest.*;
public class YieldingThread extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private static int threadCount = 0;
public YieldingThread() {
super("" + ++threadCount);
start();
}
public String toString() {
return "#" + getName() + ": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
yield();
}
}
public static void main(String[] args) {
for(int i = 0; i < 5; i++)
new YieldingThread();
monitor.expect(new String[] {
"#1: 5",
"#2: 5",
"#4: 5",
"#5: 5",
"#3: 5",
"#1: 4",
"#2: 4",
"#4: 4",
"#5: 4",
"#3: 4",
"#1: 3",
"#2: 3",
"#4: 3",
"#5: 3",
"#3: 3",
"#1: 2",
"#2: 2",
"#4: 2",
"#5: 2",
"#3: 2",
"#1: 1",
"#2: 1",
"#4: 1",
"#5: 1",
"#3: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

用了yield( )之后,程序的輸出變得更整齊了。但是如果字符串更長(zhǎng)一些的話,效果就會(huì)同SimpleThread.java的差不多了(試試看,逐步增加toString( )字符串的長(zhǎng)度,看看輸出的效果)。Java的線程調(diào)度機(jī)制是搶占式的(preemptive),也就是說(shuō),只要它認(rèn)為有必要,它會(huì)隨時(shí)中斷當(dāng)前線程,并且切換到其它線程。因此,如果I/O(通過(guò)main( )線程執(zhí)行)占用的時(shí)間太長(zhǎng)了,線程調(diào)度機(jī)制就會(huì)在run( )運(yùn)行到yield( )之前把它給停下來(lái)??傊?span id="pqkdfof" class=original_words>yield( )只會(huì)在很少的情況下起作用,而且不能用來(lái)進(jìn)行很嚴(yán)肅的調(diào)校。

Sleeping

還有一種控制線程的辦法,就是用sleep( )讓它停一段以毫秒計(jì)的時(shí)間。如果把上面那段程序里的yield( )換成sleep( ),程序就變成這樣了:

//: c13:SleepingThread.java
// Calling sleep() to wait for awhile.
import com.bruceeckel.simpletest.*;
public class SleepingThread extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private static int threadCount = 0;
public SleepingThread() {
super("" + ++threadCount);
start();
}
public String toString() {
return "#" + getName() + ": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void
main(String[] args) throws InterruptedException {
for(int i = 0; i < 5; i++)
new SleepingThread().join();
monitor.expect(new String[] {
"#1: 5",
"#1: 4",
"#1: 3",
"#1: 2",
"#1: 1",
"#2: 5",
"#2: 4",
"#2: 3",
"#2: 2",
"#2: 1",
"#3: 5",
"#3: 4",
"#3: 3",
"#3: 2",
"#3: 1",
"#4: 5",
"#4: 4",
"#4: 3",
"#4: 2",
"#4: 1",
"#5: 5",
"#5: 4",
"#5: 3",
"#5: 2",
"#5: 1"
});
}
} ///:~

sleep( )一定要放在try域里,這是因?yàn)橛锌赡軙?huì)出現(xiàn)時(shí)間沒(méi)到sleep( )就被中斷的情況。如果有人拿到了線程的reference,并且調(diào)用了它的interrupt( ),這種事就發(fā)生了。(interrupt( )也會(huì)影響處于wait( )join( )狀態(tài)的線程,所以這兩個(gè)方法也要放在try域里。后面我們會(huì)再講。)如果你準(zhǔn)備用interrupt( )喚醒線程,那最好是用wait( )而不是sleep( ),因?yàn)檫@兩者的catch語(yǔ)句是不一樣的。這里我們所遵循的原則是:"除非知道該怎樣去處理異常,否則別去捕捉"。所以,我們把它當(dāng)作RuntimeException往外面拋。

你會(huì)發(fā)現(xiàn)輸出變得有規(guī)律了——每個(gè)線程都會(huì)在其它線程開始之前倒數(shù)到零。這是因?yàn)槲覀儗?duì)每個(gè)線程都用了join( )(馬上就要講),于是main( )會(huì)在繼續(xù)下一步的執(zhí)行之前,先等這個(gè)線程結(jié)束。如果沒(méi)有join( ),你會(huì)看到,線程還是在以隨機(jī)的方式運(yùn)行,也就是說(shuō)sleep( )也不是什么控制線程執(zhí)行的辦法。它只是暫停線程。唯一能保證的事情是,它會(huì)休眠至少100毫秒,但是它恢復(fù)運(yùn)行所花的時(shí)間可能更長(zhǎng),因?yàn)樵谛菝呓Y(jié)束之后,線程調(diào)度機(jī)制還要花時(shí)間來(lái)接管。

如果你一定要控制線程的執(zhí)行順序,那最徹底的辦法還是不用線程。你可以自己寫一個(gè)協(xié)作程序,讓它按一定順序交換程序的運(yùn)行權(quán)。

優(yōu)先級(jí)

線程的優(yōu)先級(jí)(priority)的作用是,告訴線程調(diào)度機(jī)制這個(gè)線程的重要程度的高低。雖然CPU伺候線程的順序是非決定性的,但是如果有很多線程堵在那里等著啟動(dòng),線程調(diào)度機(jī)制會(huì)傾向于首先啟動(dòng)優(yōu)先級(jí)最高的線程。但這并不意味著低優(yōu)先級(jí)的線程就沒(méi)機(jī)會(huì)運(yùn)行了(也就是說(shuō)優(yōu)先級(jí)不會(huì)造成死鎖)。優(yōu)先級(jí)低只表示運(yùn)行的機(jī)會(huì)少而已。

下面我們用優(yōu)先級(jí)來(lái)修改SimpleThread.java。優(yōu)先級(jí)是通過(guò)ThreadsetPriority( )方法調(diào)整的。

//: c13:SimplePriorities.java
// Shows the use of thread priorities.
import com.bruceeckel.simpletest.*;
public class SimplePriorities extends Thread {
private static Test monitor = new Test();
private int countDown = 5;
private volatile double d = 0; // No optimization
public SimplePriorities(int priority) {
setPriority(priority);
start();
}
public String toString() {
return super.toString() + ": " + countDown;
}
public void run() {
while(true) {
// An expensive, interruptable operation:
for(int i = 1; i < 100000; i++)
d = d + (Math.PI + Math.E) / (double)i;
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
new SimplePriorities(Thread.MAX_PRIORITY);
for(int i = 0; i < 5; i++)
new SimplePriorities(Thread.MIN_PRIORITY);
monitor.expect(new String[] {
"Thread[Thread-1,10,main]: 5",
"Thread[Thread-1,10,main]: 4",
"Thread[Thread-1,10,main]: 3",
"Thread[Thread-1,10,main]: 2",
"Thread[Thread-1,10,main]: 1",
"Thread[Thread-2,1,main]: 5",
"Thread[Thread-2,1,main]: 4",
"Thread[Thread-2,1,main]: 3",
"Thread[Thread-2,1,main]: 2",
"Thread[Thread-2,1,main]: 1",
"Thread[Thread-3,1,main]: 5",
"Thread[Thread-4,1,main]: 5",
"Thread[Thread-5,1,main]: 5",
"Thread[Thread-6,1,main]: 5",
"Thread[Thread-3,1,main]: 4",
"Thread[Thread-4,1,main]: 4",
"Thread[Thread-5,1,main]: 4",
"Thread[Thread-6,1,main]: 4",
"Thread[Thread-3,1,main]: 3",
"Thread[Thread-4,1,main]: 3",
"Thread[Thread-5,1,main]: 3",
"Thread[Thread-6,1,main]: 3",
"Thread[Thread-3,1,main]: 2",
"Thread[Thread-4,1,main]: 2",
"Thread[Thread-5,1,main]: 2",
"Thread[Thread-6,1,main]: 2",
"Thread[Thread-4,1,main]: 1",
"Thread[Thread-3,1,main]: 1",
"Thread[Thread-6,1,main]: 1",
"Thread[Thread-5,1,main]: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

這里我們用Thread.toString( )方法覆寫了SimplePrioritytoString( )。Thread.toString( )會(huì)打印線程的名字(你可以在構(gòu)造函數(shù)里設(shè)置這個(gè)名字;不過(guò)這里,我們用了線程自動(dòng)生成的Thread-1, Thread-2等),優(yōu)先級(jí),及其所屬的"線程組"。由于線程是自我標(biāo)識(shí)的,因此我們沒(méi)用threadNumber。覆寫后的toString( )還會(huì)顯示countDown的值。

你會(huì)發(fā)現(xiàn),線程1的優(yōu)先級(jí)被設(shè)到了最高水平,而其它線程的優(yōu)先級(jí)都設(shè)在最低水平。

run( )進(jìn)行了100,000次非常耗時(shí)的double型加法和除法運(yùn)算。把變量d設(shè)成volatile是為了排除優(yōu)化。要不然,你根本沒(méi)法看出優(yōu)先級(jí)的效果(可以試試把運(yùn)算用的for循環(huán)去掉)。通過(guò)運(yùn)算,你能看出,線程調(diào)度機(jī)制為線程1安排了最多的機(jī)會(huì)。雖然往控制臺(tái)輸出也是一種很"昂貴"的操作,但是你卻不能用它來(lái)觀察優(yōu)先級(jí)的效果,因?yàn)榫€程往控制臺(tái)打印的時(shí)候是不會(huì)被中斷的(否則控制臺(tái)的顯示就亂了),但是數(shù)學(xué)運(yùn)算是可以被打斷的。對(duì)線程調(diào)度機(jī)制來(lái)說(shuō),這次運(yùn)算的耗時(shí)長(zhǎng)得足以讓它注意到線程1的高優(yōu)先級(jí)。

此外,你還可以用getPriority( )來(lái)讀取線程的優(yōu)先級(jí),用setPriority( )隨時(shí)修改線程的優(yōu)先級(jí)(而不是像SimplePriorities.java那樣,在構(gòu)造函數(shù)里面設(shè)置)。

雖然JDK提供了10級(jí)優(yōu)先級(jí),但是卻不能很好地映射到很多操作系統(tǒng)上。比方說(shuō),Windows 2000平臺(tái)上有7個(gè)等級(jí)還沒(méi)固定下來(lái),因此映射是不確定的(雖然Sun的Solaris有231個(gè)等級(jí))。要想保持可移植性,唯一的辦法就是,在調(diào)整優(yōu)先級(jí)的時(shí)候,盯住MIN_PRIORITY, NORM_PRIORITY, 和MIN_PRORITY

守護(hù)線程

所謂"守護(hù)線程(daemon thread)"是指,只要程序還在運(yùn)行,它就應(yīng)該在后臺(tái)提供某種公共服務(wù)的線程,但是守護(hù)線程不屬于程序的核心部分。因此,當(dāng)所有非守護(hù)線程都運(yùn)行結(jié)束的時(shí)候,程序也結(jié)束了。相反,只要還有非守護(hù)線程在運(yùn)行,程序就不能結(jié)束。比如,運(yùn)行main( )的線程就屬于非守護(hù)線程。

//: c13:SimpleDaemons.java
// Daemon threads don‘t prevent the program from ending.
public class SimpleDaemons extends Thread {
public SimpleDaemons() {
setDaemon(true); // Must be called before start()
start();
}
public void run() {
while(true) {
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(this);
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++)
new SimpleDaemons();
}
} ///:~

要想創(chuàng)建守護(hù)線程,必須在它啟動(dòng)之前就setDaemon( )。我們讓SimpleDaemonsrun( )睡了一小會(huì)兒,但是當(dāng)所有的線程全都啟動(dòng)了,程序也就結(jié)束了,甚至連讓線程打印的時(shí)間都沒(méi)留下。這是因?yàn)橐呀?jīng)沒(méi)有非守護(hù)線程(這里只有main( )是)來(lái)維持程序的運(yùn)行了。于是程序什么都沒(méi)打印就結(jié)束了。

可以用isDaemon( )來(lái)判斷一個(gè)線程是不是守護(hù)線程。守護(hù)線程所創(chuàng)建的線程也自動(dòng)是守護(hù)線程。請(qǐng)看下面這個(gè)例子:

//: c13:Daemons.java
// Daemon threads spawn other daemon threads.
import java.io.*;
import com.bruceeckel.simpletest.*;
class Daemon extends Thread {
private Thread[] t = new Thread[10];
public Daemon() {
setDaemon(true);
start();
}
public void run() {
for(int i = 0; i < t.length; i++)
t[i] = new DaemonSpawn(i);
for(int i = 0; i < t.length; i++)
System.out.println("t[" + i + "].isDaemon() = "
+ t[i].isDaemon());
while(true)
yield();
}
}
class DaemonSpawn extends Thread {
public DaemonSpawn(int i) {
start();
System.out.println("DaemonSpawn " + i + " started");
}
public void run() {
while(true)
yield();
}
}
public class Daemons {
private static Test monitor = new Test();
public static void main(String[] args) throws Exception {
Thread d = new Daemon();
System.out.println("d.isDaemon() = " + d.isDaemon());
// Allow the daemon threads to
// finish their startup processes:
Thread.sleep(1000);
monitor.expect(new String[] {
"d.isDaemon() = true",
"DaemonSpawn 0 started",
"DaemonSpawn 1 started",
"DaemonSpawn 2 started",
"DaemonSpawn 3 started",
"DaemonSpawn 4 started",
"DaemonSpawn 5 started",
"DaemonSpawn 6 started",
"DaemonSpawn 7 started",
"DaemonSpawn 8 started",
"DaemonSpawn 9 started",
"t[0].isDaemon() = true",
"t[1].isDaemon() = true",
"t[2].isDaemon() = true",
"t[3].isDaemon() = true",
"t[4].isDaemon() = true",
"t[5].isDaemon() = true",
"t[6].isDaemon() = true",
"t[7].isDaemon() = true",
"t[8].isDaemon() = true",
"t[9].isDaemon() = true"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

Daemon先將自己的守護(hù)標(biāo)志設(shè)成"true",再創(chuàng)建了一些線程。它并沒(méi)有去設(shè)置這些線程的daemon狀態(tài),但是測(cè)試的結(jié)果卻告訴我們,它們都是守護(hù)線程。接著它進(jìn)入了一個(gè)無(wú)限循環(huán),不斷地調(diào)用yield( )把控制權(quán)交給別的線程。

一旦main( )完成了任務(wù),程序也就結(jié)束了。這是因?yàn)槌耸刈o(hù)線程,已經(jīng)沒(méi)別的東西在運(yùn)行了。我們讓main( )的線程睡上一秒鐘,這樣你就能看清楚守護(hù)線程是怎樣啟動(dòng)的了。如果不這么做,你只能看到部分線程的創(chuàng)建。(試著調(diào)整sleep( )的時(shí)間,看看會(huì)有什么結(jié)果。)

連接線程

線程還能調(diào)用另一個(gè)線程的join( ),等那個(gè)線程結(jié)束之后再繼續(xù)運(yùn)行。如果線程調(diào)用了調(diào)用了另一個(gè)線程tt.join( ),那么在線程t結(jié)束之前(判斷標(biāo)準(zhǔn)是,t.isAlive( )等于false),主叫線程會(huì)被掛起。

調(diào)用join( )的時(shí)候可以給一個(gè)timeout參數(shù),(可以是以毫秒,也可以是以納秒作單位),這樣如果目標(biāo)線程在時(shí)限到期之后還沒(méi)有結(jié)束,join( )就會(huì)強(qiáng)制返回了。

join( )調(diào)用可以被主叫線程的interrupt( )打斷,所以join( )也要用try-catch括起來(lái)。

所有這些事情,下面這段代碼里全有了:

//: c13:Joining.java
// Understanding join().
import com.bruceeckel.simpletest.*;
class Sleeper extends Thread {
private int duration;
public Sleeper(String name, int sleepTime) {
super(name);
duration = sleepTime;
start();
}
public void run() {
try {
sleep(duration);
} catch (InterruptedException e) {
System.out.println(getName() + " was interrupted. " +
"isInterrupted(): " + isInterrupted());
return;
}
System.out.println(getName() + " has awakened");
}
}
class Joiner extends Thread {
private Sleeper sleeper;
public Joiner(String name, Sleeper sleeper) {
super(name);
this.sleeper = sleeper;
start();
}
public void run() {
try {
sleeper.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(getName() + " join completed");
}
}
public class Joining {
private static Test monitor = new Test();
public static void main(String[] args) {
Sleeper
sleepy = new Sleeper("Sleepy", 1500),
grumpy = new Sleeper("Grumpy", 1500);
Joiner
dopey = new Joiner("Dopey", sleepy),
doc = new Joiner("Doc", grumpy);
grumpy.interrupt();
monitor.expect(new String[] {
"Grumpy was interrupted. isInterrupted(): false",
"Doc join completed",
"Sleepy has awakened",
"Dopey join completed"
}, Test.AT_LEAST + Test.WAIT);
}
} ///:~

Sleeper是一個(gè)會(huì)睡上一段時(shí)間的Thread,至于睡多長(zhǎng)時(shí)間,這要由構(gòu)造函數(shù)的參數(shù)決定。Sleeperrun( )sleep( )可以因時(shí)限到期而返回,也可以被interrupt( )打斷。catch語(yǔ)句在報(bào)告中斷的同時(shí),會(huì)一并報(bào)告isInterrupted( )。當(dāng)有別的線程調(diào)用了本線程的interrupt( )時(shí),會(huì)設(shè)置一個(gè)標(biāo)記以表示這個(gè)這個(gè)線程被打斷了。當(dāng)本線程捕獲這個(gè)異常的時(shí)候,會(huì)清除這個(gè)標(biāo)志。所以catch語(yǔ)句會(huì)永遠(yuǎn)報(bào)告說(shuō)isInterrupted( )是false。這個(gè)標(biāo)記是用來(lái)應(yīng)付其它情況的,或許在沒(méi)出異常的情況下,線程要用它來(lái)檢查自己是不是被中斷了。

Joiner是另一個(gè)線程,它調(diào)用了Sleeperjoin( ),所以它要等Sleeper醒過(guò)來(lái)。main( )創(chuàng)建了兩個(gè)Sleeper分派給兩個(gè)Joiner。你會(huì)發(fā)現(xiàn),不論Sleeper是被打斷還是正常結(jié)束,Joiner都會(huì)隨Sleeper一道結(jié)束。

另外一種方式

迄今為止,你所看到的都是些很簡(jiǎn)單的例子。這些線程都繼承了Thread,這種做法很很明智,對(duì)象只是作為線程,不做別的事情。但是類可能已經(jīng)繼承了別的類,這樣它就不能再繼承Thread了(Java不支持多重繼承)。這時(shí),你就要用Runnable接口了。Runnable的意思是,這個(gè)類實(shí)現(xiàn)了run( )方法,而Thread就是Runnable的。

下面我們來(lái)演示一下這種方法的基本步驟:

//: c13:RunnableThread.java
// SimpleThread using the Runnable interface.
public class RunnableThread implements Runnable {
private int countDown = 5;
public String toString() {
return "#" + Thread.currentThread().getName() +
": " + countDown;
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
}
}
public static void main(String[] args) {
for(int i = 1; i <= 5; i++)
new Thread(new RunnableThread(), "" + i).start();
// Output is like SimpleThread.java
}
} ///:~

Runnable接口只有一個(gè)方法,那就是run( ),但是如果你想對(duì)它做一些Thread對(duì)象才能做的事情(比方說(shuō)toString( )里面的getName( )),你就必須用Thread.currentThread( )去獲取其reference。Thread類有一個(gè)構(gòu)造函數(shù),可以拿Runnable和線程的名字作參數(shù)。

如果對(duì)象是Runnable的,那只說(shuō)明它有run( )方法。這并沒(méi)有什么特別的,也就是說(shuō),不會(huì)因?yàn)樗?span id="smklktg" class=original_words>Runnable的,就使它具備了線程的先天功能,這一點(diǎn)同Thread的派生類不同的。所以你必須像例程那樣,用Runnable對(duì)象去創(chuàng)建線程。把Runnable對(duì)象傳給Thread的構(gòu)造函數(shù),創(chuàng)建一個(gè)獨(dú)立的Thread對(duì)象。接著再調(diào)用那個(gè)線程的start( ),由它來(lái)進(jìn)行初始化,然后線程的調(diào)度機(jī)制就能調(diào)用run( )了。

Runnable interface的好處在于,所有東西都屬于同一個(gè)類;也就是說(shuō)Runnable能讓你創(chuàng)建基類和其它接口的mixin(混合類)。如果你要訪問(wèn)其它東西,直接用就是了,不用再一個(gè)一個(gè)地打交道。但是內(nèi)部類也有這個(gè)功能,它也可以直接訪問(wèn)宿主類的成員。所以這個(gè)理由不足以說(shuō)服我們放棄Thread的內(nèi)部類而去使用Runnable的mixin。

Runnable的意思是,你要用代碼——也就是run( )方法——來(lái)描述一個(gè)處理過(guò)程,而不是創(chuàng)建一個(gè)表示這個(gè)處理過(guò)程的對(duì)象。在如何理解線程方面,一直存在著爭(zhēng)議。這取決于,你是將線程看作是對(duì)象還是處理過(guò)程[68]。如果你認(rèn)為它是一個(gè)處理過(guò)程,那么你就擺脫了"萬(wàn)物皆對(duì)象"的OO教條。但與此同時(shí),如果你只想讓這個(gè)處理過(guò)程掌管程序的某一部分,那你就沒(méi)理由讓整個(gè)類都成為Runnable的。有鑒于此,用內(nèi)部類的形式將線程代碼隱藏起來(lái),通常是個(gè)更明智的選擇。就像下面這段代碼:

//: c13:ThreadVariations.java
// Creating threads with inner classes.
import com.bruceeckel.simpletest.*;
// Using a named inner class:
class InnerThread1 {
private int countDown = 5;
private Inner inner;
private class Inner extends Thread {
Inner(String name) {
super(name);
start();
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return getName() + ": " + countDown;
}
}
public InnerThread1(String name) {
inner = new Inner(name);
}
}
// Using an anonymous inner class:
class InnerThread2 {
private int countDown = 5;
private Thread t;
public InnerThread2(String name) {
t = new Thread(name) {
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
// Using a named Runnable implementation:
class InnerRunnable1 {
private int countDown = 5;
private Inner inner;
private class Inner implements Runnable {
Thread t;
Inner(String name) {
t = new Thread(this, name);
t.start();
}
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return t.getName() + ": " + countDown;
}
}
public InnerRunnable1(String name) {
inner = new Inner(name);
}
}
// Using an anonymous Runnable implementation:
class InnerRunnable2 {
private int countDown = 5;
private Thread t;
public InnerRunnable2(String name) {
t = new Thread(new Runnable() {
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return Thread.currentThread().getName() +
": " + countDown;
}
}, name);
t.start();
}
}
// A separate method to run some code as a thread:
class ThreadMethod {
private int countDown = 5;
private Thread t;
private String name;
public ThreadMethod(String name) { this.name = name; }
public void runThread() {
if(t == null) {
t = new Thread(name) {
public void run() {
while(true) {
System.out.println(this);
if(--countDown == 0) return;
try {
sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public String toString() {
return getName() + ": " + countDown;
}
};
t.start();
}
}
}
public class ThreadVariations {
private static Test monitor = new Test();
public static void main(String[] args) {
new InnerThread1("InnerThread1");
new InnerThread2("InnerThread2");
new InnerRunnable1("InnerRunnable1");
new InnerRunnable2("InnerRunnable2");
new ThreadMethod("ThreadMethod").runThread();
monitor.expect(new String[] {
"InnerThread1: 5",
"InnerThread2: 5",
"InnerThread2: 4",
"InnerRunnable1: 5",
"InnerThread1: 4",
"InnerRunnable2: 5",
"ThreadMethod: 5",
"InnerRunnable1: 4",
"InnerThread2: 3",
"InnerRunnable2: 4",
"ThreadMethod: 4",
"InnerThread1: 3",
"InnerRunnable1: 3",
"ThreadMethod: 3",
"InnerThread1: 2",
"InnerThread2: 2",
"InnerRunnable2: 3",
"InnerThread2: 1",
"InnerRunnable2: 2",
"InnerRunnable1: 2",
"ThreadMethod: 2",
"InnerThread1: 1",
"InnerRunnable1: 1",
"InnerRunnable2: 1",
"ThreadMethod: 1"
}, Test.IGNORE_ORDER + Test.WAIT);
}
} ///:~

InnerThread1創(chuàng)建了一個(gè)繼承Thread的非匿名內(nèi)部類,然后在構(gòu)造函數(shù)里創(chuàng)建了一個(gè)這個(gè)內(nèi)部類的實(shí)例。如果你要用宿主類的方法訪問(wèn)內(nèi)部類的特殊功能(新方法)的話,這種做法會(huì)比較好。但是絕大多數(shù)情況下,創(chuàng)建線程的目的,只是要用它的Thread功能,所以不必創(chuàng)建非匿名的內(nèi)部類。InnerThread2就演示了這個(gè)方案:在構(gòu)造函數(shù)里面創(chuàng)建一個(gè)繼承Thread的匿名內(nèi)部類,然后把它上傳給Thread的reference t。宿主類的其它方法可以通過(guò)t訪問(wèn)Thread的接口,它們無(wú)需知道這個(gè)對(duì)象的確切類型。

第三和第四個(gè)類重復(fù)了前兩個(gè)類,不過(guò)它們都不是Thread,而是Runnable。這么做是想告訴你,用Runnable什么便宜也沒(méi)占到,相反代碼還稍微復(fù)雜了一些(也更難讀了一些)。所以結(jié)論是,除非迫不得已只能用Runnable,否則我會(huì)選Thread。

ThreadMethod演示了怎樣在方法的內(nèi)部創(chuàng)建線程。要運(yùn)行線程的時(shí)候就調(diào)用方法,線程啟動(dòng)之后方法就返回。如果這個(gè)線程不那么重要,只是做一些輔助操作的話,那么相比在構(gòu)造函數(shù)里啟動(dòng)線程,這個(gè)方法或許會(huì)更好一些。

創(chuàng)建反應(yīng)敏捷的用戶界面

正如我們前面所講的,創(chuàng)建反映敏捷的用戶界面是多線程的主要用途之一。雖然我們要到第14章才講講圖形用戶界面,但是現(xiàn)在我們可以做一個(gè)簡(jiǎn)單的控制臺(tái)用戶界面的程序。下面這段程序有兩個(gè)版本,一個(gè)專注于計(jì)算從不理會(huì)控制臺(tái)的輸入,而另一個(gè)將計(jì)算任務(wù)放到線程里面,因此能在進(jìn)行計(jì)算的同時(shí)還監(jiān)聽控制臺(tái)的輸入。

//: c13:ResponsiveUI.java
// User interface responsiveness.
import com.bruceeckel.simpletest.*;
class UnresponsiveUI {
private volatile double d = 1;
public UnresponsiveUI() throws Exception {
while(d > 0)
d = d + (Math.PI + Math.E) / d;
System.in.read(); // Never gets here
}
}
public class ResponsiveUI extends Thread {
private static Test monitor = new Test();
private static volatile double d = 1;
public ResponsiveUI() {
setDaemon(true);
start();
}
public void run() {
while(true) {
d = d + (Math.PI + Math.E) / d;
}
}
public static void main(String[] args) throws Exception {
//! new UnresponsiveUI(); // Must kill this process
new ResponsiveUI();
Thread.sleep(300);
System.in.read(); // ‘monitor‘ provides input
System.out.println(d); // Shows progress
}
} ///:~

UnresponsiveUI將計(jì)算工作放進(jìn)一個(gè)無(wú)限循環(huán),所以很明顯,它永遠(yuǎn)也不會(huì)去理會(huì)控制臺(tái)的輸入(編譯器受騙了,它以為過(guò)了while循環(huán)就能讀到輸入了)。如果你把創(chuàng)建UnresponsiveUI對(duì)象的那行代碼注釋回來(lái),再重新運(yùn)行,那么除非殺掉進(jìn)程否則沒(méi)法退出程序。

要想讓程序反應(yīng)靈敏,可以把運(yùn)算放進(jìn)run( )里面,然后讓搶占式的調(diào)度程序來(lái)管理它,這樣當(dāng)你按下Enter鍵的時(shí)候,就能看到,它在一邊等用戶的輸入一邊做后臺(tái)的計(jì)算(處于測(cè)試的目的,我們讓com.bruceeckel.simpletest.Test對(duì)象自動(dòng)地往System.in.read( )控制臺(tái)寫。關(guān)于這個(gè)對(duì)象,我們到第15章再講)。

共享有限的資源

你可以認(rèn)為單線程程序是一個(gè)在問(wèn)題空間里游走的,一次只作一件事的孤獨(dú)的個(gè)體。由于只有它一個(gè),因此你無(wú)需考慮兩個(gè)實(shí)體同時(shí)申請(qǐng)同一項(xiàng)資源的問(wèn)題。這個(gè)問(wèn)題有點(diǎn)像兩個(gè)人同時(shí)把車停在一個(gè)車位上,同時(shí)穿一扇門,甚至是同時(shí)發(fā)言。

但是在多線程環(huán)境下,事情就不那么簡(jiǎn)單了,你必須考慮兩個(gè)或兩個(gè)以上線程同時(shí)申請(qǐng)同一資源的問(wèn)題。必須杜絕資源訪問(wèn)方面的沖突,否則你就會(huì)碰到兩個(gè)線程同時(shí)訪問(wèn)一個(gè)銀行賬號(hào),同時(shí)往一臺(tái)打印機(jī)上輸出,或者同時(shí)調(diào)整一個(gè)閥值之類的問(wèn)題。

用不正確的方法訪問(wèn)資源

試看下面這段例程。AlwaysEven會(huì)"保證",每次調(diào)用getValue( )的時(shí)候都會(huì)返回一個(gè)偶數(shù)。此外還有一個(gè) "Watcher"線程,它會(huì)不時(shí)地調(diào)用getValue( ),然后檢查這個(gè)數(shù)是不是真的是偶數(shù)。這么做看上去有些多余,因?yàn)閺拇a上看,很明顯這個(gè)值肯定是偶數(shù)。但是意外來(lái)了。下面是源代碼:

//: c13:AlwaysEven.java
// Demonstrating thread collision over resources by
// reading an object in an unstable intermediate state.
public class AlwaysEven {
private int i;
public void next() { i++; i++; }
public int getValue() { return i; }
public static void main(String[] args) {
final AlwaysEven ae = new AlwaysEven();
new Thread("Watcher") {
public void run() {
while(true) {
int val = ae.getValue();
if(val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}.start();
while(true)
ae.next();
}
} ///:~

main( )創(chuàng)建了一個(gè)AlwaysEven對(duì)象,由于要供匿名的Thread內(nèi)部類訪問(wèn),因此它必須是final的。如果這個(gè)線程讀出的值不是偶數(shù),它會(huì)把這個(gè)值打印出來(lái)(以此證明它捕獲了一個(gè)正處于不穩(wěn)定狀態(tài)的對(duì)象),然后退出程序。

這個(gè)例子很形象地說(shuō)明了多線程環(huán)境的最本質(zhì)的問(wèn)題:永遠(yuǎn)也不會(huì)知道線程會(huì)在什么時(shí)候啟動(dòng)。想想,你正坐在餐桌邊上,拿著一把叉子,對(duì)準(zhǔn)了盤子里最后一片火腿,正當(dāng)你的叉子要碰到火腿的時(shí)候,它突然消失了(因?yàn)槟阋呀?jīng)被掛起來(lái)了,另一個(gè)線程進(jìn)來(lái)偷走了那片火腿)。這就是寫并發(fā)程序的時(shí)候要處理的問(wèn)題。

有些時(shí)候,你不用關(guān)心別人是不是正在用那個(gè)資源(從別人的盤子里叉火腿)。但是對(duì)多線程環(huán)境,你必須要有辦法能防止兩個(gè)線程同時(shí)訪問(wèn)同一個(gè)資源,至少別在關(guān)鍵的時(shí)候。

要防止這種沖突很簡(jiǎn)單,只要在線程運(yùn)行的時(shí)候給資源上鎖就行了。第一個(gè)訪問(wèn)這個(gè)資源的線程給它上鎖,在它解鎖之前,其它線程都不能訪問(wèn)這個(gè)資源,接著另一個(gè)線程給這個(gè)資源上鎖然后再使用,如此循環(huán)。如果轎車的前座是一個(gè)有限的資源,那么哪個(gè)小孩想坐,它就得大喊一聲"是我的"。

測(cè)試框架

為了簡(jiǎn)化問(wèn)題,在開始講解之前,我們先寫一個(gè)測(cè)試線程程序的小框架。我們的做法是,把這些例程里的公用代碼分離出來(lái)。先說(shuō)"watcher"線程,它的任務(wù)是觀察對(duì)象的"invariant"是不是被破壞了。也就是說(shuō),我們假設(shè)對(duì)象的內(nèi)部狀態(tài)應(yīng)該遵守某種規(guī)則,但是如果你從外部觀察到對(duì)象正處在一個(gè)無(wú)效的中間狀態(tài)的話,那么從用戶的角度來(lái)看它的"invariant"就已經(jīng)遭到破壞了(不是說(shuō)對(duì)象不能處在這種無(wú)效的中間狀態(tài),只是這種狀態(tài)不應(yīng)該被客戶觀察到)。我們要有辦法能觀察它的invariant是不是被破壞了,同時(shí)還要知道這個(gè)值是多少。為了讓方法能同時(shí)返回這兩個(gè)信息,我們把它合并成一個(gè)標(biāo)記接口(tagging interface)。所謂標(biāo)記接口是指只有名字沒(méi)有方法的接口。

//: c13:InvariantState.java
// Messenger carrying invariant data
public interface InvariantState {} ///:~

為了提高代碼的可讀性,根據(jù)我們的設(shè)計(jì),成功或失敗的信息已經(jīng)包含在類的名字和類的類型里了。表示成功的類是:

//: c13:InvariantOK.java
// Indicates that the invariant test succeeded
public class InvariantOK implements InvariantState {} ///:~

表示失敗的類是InvariantFailure。不過(guò)失敗對(duì)象通常要攜帶原因,因此:

//: c13:InvariantFailure.java
// Indicates that the invariant test failed
public class InvariantFailure implements InvariantState {
public Object value;
public InvariantFailure(Object value) {
this.value = value;
}
} ///:~

現(xiàn)在我們可以定義要進(jìn)行測(cè)試的類的接口了:

//: c13:Invariant.java
public interface Invariant {
InvariantState invariant();
} ///:~

在創(chuàng)建通用的"watcher"線程之前,先提醒大家,本章的某些例程,在某些平臺(tái)下運(yùn)行結(jié)果可能會(huì)與預(yù)想的不一樣。這里的很多例子都想說(shuō)明,在多線程的環(huán)境下,單個(gè)線程的運(yùn)行已經(jīng)收到了侵犯,但是這種侵犯不是每次都發(fā)生。[69]還有一些例子,它是想演示這種侵犯(但老是不成工),因此我們可以證明這種侵犯不會(huì)發(fā)生。碰到這種情況,我們就得有辦法讓程序過(guò)幾秒停下來(lái)。下面這個(gè)類做的就是這件事,它繼承了標(biāo)準(zhǔn)類庫(kù)的Timer類:

//: c13:Timeout.java
// Set a time limit on the execution of a program
import java.util.*;
public class Timeout extends Timer {
public Timeout(int delay, final String msg) {
super(true); // Daemon thread
schedule(new TimerTask() {
public void run() {
System.out.println(msg);
System.exit(0);
}
}, delay);
}
} ///:~

delay以毫秒計(jì),時(shí)間到了就打印消息。注意,調(diào)用了super(true)之后,它就是一個(gè)守護(hù)線程了,因此如果其它線程都退出了,它也就結(jié)束了。我們給Timer.schedule( )兩個(gè)參數(shù),一個(gè)是TimerTask(這里用匿名內(nèi)部類的方式創(chuàng)建),另一個(gè)delay。經(jīng)過(guò)delay毫秒,Timeout就開始執(zhí)行TimerTaskrun( )方法。用Timer要比直接用sleep( )更簡(jiǎn)單也清楚。此外,Timer的擴(kuò)展性很好,它能支持大量的(數(shù)以千計(jì)的)并發(fā)任務(wù),因此它還是一個(gè)非常有用的工具。

現(xiàn)在我們?cè)摪?span id="qrausbz" class=original_words>Invariant接口和Timeout類用進(jìn)InvariantWatcher線程了:

//: c13:InvariantWatcher.java
// Repeatedly checks to ensure invariant is not violated
public class InvariantWatcher extends Thread {
private Invariant invariant;
public InvariantWatcher(Invariant invariant) {
this.invariant = invariant;
setDaemon(true);
start();
}
// Stop everything after awhile:
public
InvariantWatcher(Invariant invariant, final int timeOut){
this(invariant);
new Timeout(timeOut,
"Timed out without violating invariant");
}
public void run() {
while(true) {
InvariantState state = invariant.invariant();
if(state instanceof InvariantFailure) {
System.out.println("Invariant violated: "
+ ((InvariantFailure)state).value);
System.exit(0);
}
}
}
} ///:~

構(gòu)造函數(shù)需要一個(gè)測(cè)試用的Invariant對(duì)象,此外線程也是在構(gòu)造函數(shù)里面啟動(dòng)的。第二個(gè)構(gòu)造函數(shù)調(diào)用了第一個(gè),此外它還創(chuàng)建了一個(gè)Timeout對(duì)象。Timeout的任務(wù)是過(guò)一段時(shí)間停止當(dāng)前程序的運(yùn)行——這是給invariant不會(huì)收到侵犯的程序準(zhǔn)備的,否則程序就停不出來(lái)了。run( )會(huì)捕捉當(dāng)前的InvariantState,并對(duì)它進(jìn)行測(cè)試,如果失敗,它會(huì)打印value的值。注意,我們不能從線程內(nèi)部往外面拋異常,因?yàn)檫@只會(huì)中止線程而不是程序。

現(xiàn)在我們可以用這個(gè)框架來(lái)重寫AlwaysEven.java了:

//: c13:EvenGenerator.java
// AlwaysEven.java using the invariance tester
public class EvenGenerator implements Invariant {
private int i;
public void next() { i++; i++; }
public int getValue() { return i; }
public InvariantState invariant() {
int val = i; // Capture it in case it changes
if(val % 2 == 0)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
public static void main(String[] args) {
EvenGenerator gen = new EvenGenerator();
new InvariantWatcher(gen);
while(true)
gen.next();
}
} ///:~

定義invariant( )的時(shí)候一定要把要觀察的值存到本地變量里面。這樣才能觀測(cè)到真實(shí)的值,而不是會(huì)變的值(被其它線程修改了)。

這里,問(wèn)題不在對(duì)象會(huì)處于無(wú)效狀態(tài),而在于當(dāng)對(duì)象正處于這種無(wú)效狀態(tài)的時(shí)候,別的線程在調(diào)用它的方法。

資源訪問(wèn)的沖突

對(duì)EvenGenerator來(lái)說(shuō),最糟糕的事情就是客戶線程觀察到它正處于不穩(wěn)定的中間狀態(tài)。但是這個(gè)對(duì)象的內(nèi)部數(shù)據(jù)的一致性還是有保證的,最終還是會(huì)回到其正常的狀態(tài)。但是如果真有兩個(gè)線程在同時(shí)修改一個(gè)對(duì)象,那么這種資源沖突的后果就嚴(yán)重了,對(duì)象的狀態(tài)很可能會(huì)被改錯(cuò)。

先簡(jiǎn)單地介紹一下semaphore的概念。Semaphore是一種用于線程間通信的標(biāo)志對(duì)象。如果semaphore的值是零,則線程可以獲得它所監(jiān)視的資源,如果不是零,那么線程就無(wú)法獲取這個(gè)資源,于是線程必須等。如果申請(qǐng)到了資源,線程會(huì)先對(duì)semaphore作遞增,再使用這個(gè)資源。遞增和遞減是原子操作(atomic operation,也就是說(shuō)不會(huì)被打斷的操作),由此semaphore就防止兩個(gè)線程同時(shí)使用同一項(xiàng)資源。

如果semaphore能妥善的看護(hù)它所監(jiān)視的資源,那么對(duì)象就永遠(yuǎn)也不會(huì)陷入不穩(wěn)定狀態(tài)。下面我們先簡(jiǎn)單地實(shí)踐一下semaphore的思想:

//: c13:Semaphore.java
// A simple threading flag
public class Semaphore implements Invariant {
private volatile int semaphore = 0;
public boolean available() { return semaphore == 0; }
public void acquire() { ++semaphore; }
public void release() { --semaphore; }
public InvariantState invariant() {
int val = semaphore;
if(val == 0 || val == 1)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
} ///:~

這個(gè)類的核心部分由available( ),acquire( ),以及release( )組成,還是比較簡(jiǎn)單明了的。由于線程會(huì)在獲取對(duì)象之前先測(cè)試其是否available( ),因此semaphore的值只可能是0或1,因此我們?cè)?span id="xzmdqzq" class=original_words>invariant( )里面就測(cè)試了這兩個(gè)值。

但是看看Semaphore的效果吧:

//: c13:SemaphoreTester.java
// Colliding over shared resources
public class SemaphoreTester extends Thread {
private volatile Semaphore semaphore;
public SemaphoreTester(Semaphore semaphore) {
this.semaphore = semaphore;
setDaemon(true);
start();
}
public void run() {
while(true)
if(semaphore.available()) {
yield(); // Makes it fail faster
semaphore.acquire();
yield();
semaphore.release();
yield();
}
}
public static void main(String[] args) throws Exception {
Semaphore sem = new Semaphore();
new SemaphoreTester(sem);
new SemaphoreTester(sem);
new InvariantWatcher(sem).join();
}
} ///:~

SemaphoreTester是一個(gè)會(huì)不斷測(cè)試Semaphore對(duì)象是否available的線程,如果可以,它就先acquire,再release。注意semaphore字段是volatile的,這樣就能確保編譯器不對(duì)讀做優(yōu)化了。

main( )創(chuàng)建了兩個(gè)SemaphoreTester線程。你會(huì)發(fā)現(xiàn),不用多長(zhǎng)時(shí)間,invariant就被篡改了。這是因?yàn)?,?dāng)SemaphoreTester調(diào)用available( )的時(shí)候,對(duì)象確實(shí)是可得的,但是等到它調(diào)用acquire( )的時(shí)候,另一個(gè)線程已經(jīng)獲取并修改了這個(gè)對(duì)象的semaphore了。InvariantWater會(huì)發(fā)覺(jué)這個(gè)值太大了,或者兩個(gè)線程同時(shí)release( )之后,發(fā)現(xiàn)這個(gè)值變負(fù)的了。要注意InvariantWatcher已經(jīng)join( )到main線程了,因此程序會(huì)一直運(yùn)行到故障發(fā)生。

我發(fā)現(xiàn)在我的機(jī)器上加了yield( )之后,錯(cuò)誤提前了很多,不過(guò)這與操作系統(tǒng)和JVM有關(guān)。你可以試試把yield( )去掉;可能要花很長(zhǎng)時(shí)間才會(huì)產(chǎn)生錯(cuò)誤,這也證明了多線程程序的排錯(cuò)有多難。

這個(gè)例子再次強(qiáng)調(diào)了并發(fā)編程的風(fēng)險(xiǎn):如果這么簡(jiǎn)單的程序都會(huì)有問(wèn)題,你還能對(duì)并發(fā)環(huán)境報(bào)什么僥幸心理呢。

解決共享資源的沖突

實(shí)際上所有的多線程架構(gòu)都采用串行訪問(wèn)的方式來(lái)解決共享資源的沖突問(wèn)題。也就是說(shuō),同一時(shí)刻只有一個(gè)線程可以訪問(wèn)這個(gè)共享資源。通常是這樣實(shí)現(xiàn)的,在代碼的前后設(shè)一條加鎖和解鎖的語(yǔ)句,這樣同一時(shí)刻只有一個(gè)線程能夠執(zhí)行這段代碼。由于鎖定語(yǔ)句會(huì)產(chǎn)生"互斥(mutual exclusion)"的效果,因此這一機(jī)制通常也被稱為mutex。

就拿你們家的浴室打比方;浴室(共享資源)要供很多人(線程)使用,而且每個(gè)人用的時(shí)候都是獨(dú)占的。要用浴室之前,先要敲門,看看里面是不是有人。如果每人,就進(jìn)去把門鎖上。這樣其它要用浴室的線程,就被堵在門外了,于是它必須等在門外,直到浴室的門重新打開。

等我們講到浴室的門打開,另一個(gè)線程進(jìn)去換走原先那個(gè)線程的時(shí)候,這個(gè)比方就有些不貼切了。實(shí)際上等在外面的線程并沒(méi)有排成一列,相反由于線程的調(diào)度機(jī)制是非決定性的,因此誰(shuí)都不知道誰(shuí)會(huì)是下一個(gè)。這些線程更像是在圍著浴室的門繞圈,當(dāng)浴室的門開的那一剎那,最靠近門的那個(gè)線程會(huì)擠進(jìn)去換走原來(lái)的那個(gè)。前面已經(jīng)講過(guò)了,我們可以用yield( )setPriority( )來(lái)給線程調(diào)度機(jī)制提一些建議,但究竟能起多大作用,還要看平臺(tái)和JVM。

Java提供了內(nèi)置的防止資源沖突的解決方案,這就是synchronized關(guān)鍵詞。它的工作原理很像Semaphore類:當(dāng)線程想執(zhí)行由synchronized看護(hù)的代碼時(shí),它會(huì)先檢查其semaphore是否可得,如果是,它會(huì)先獲取semaphore,再執(zhí)行代碼,用完之后再釋放semaphore。但是和我們寫的Semaphore不同,synchronized是語(yǔ)言內(nèi)置的,因此不會(huì)有什么問(wèn)題。

通常共享資源就是一段內(nèi)存,其表現(xiàn)形式就是對(duì)象,不過(guò)也可以是文件,I/O端口或打印機(jī)之類的。要想控制對(duì)共享資源的訪問(wèn),先把它放進(jìn)對(duì)象里面。然后把所有要訪問(wèn)這個(gè)資源的方法都作成synchronized的。也就是說(shuō),只要有一個(gè)線程還在調(diào)用synchronized方法,其它線程就不允許訪問(wèn)所有的synchronized方法。

通常你會(huì)把類的成員設(shè)成private的,然后用方法進(jìn)行訪問(wèn),因此你可以把方法做成synchronized。下面就是synchronized方法的聲明:

synchronized void f() { /* ... */ }
synchronized void g(){ /* ... */ }

每個(gè)對(duì)象都有一個(gè)鎖(也稱監(jiān)控器monitor),它是對(duì)象生來(lái)就有的東西(因此你不必為此寫任何代碼)。當(dāng)你調(diào)用synchronized方法時(shí),這個(gè)對(duì)象就被鎖住了。在方法返回并且解鎖之前,誰(shuí)也不能調(diào)用同一個(gè)對(duì)象的其它synchronized方法。就說(shuō)上面那兩個(gè)方法,如果你調(diào)用了f( ),那么在f( )返回并且解鎖之前,你是不能調(diào)用同一個(gè)對(duì)象的g( )的。因此對(duì)任何一個(gè)特定的對(duì)象,所有的synchronized方法都會(huì)共享一個(gè)鎖,而這個(gè)鎖能防止兩個(gè)或兩個(gè)以上線程同時(shí)讀寫一塊共用內(nèi)存。

一個(gè)線程能多次獲得對(duì)象的鎖。也就是說(shuō),一個(gè)synchronized方法調(diào)用了另一個(gè)synchronized方法,而后者又調(diào)用了另一synchronized方法,諸如此類。JVM會(huì)跟蹤對(duì)象被上鎖的次數(shù)。如果對(duì)象沒(méi)有被鎖住,那么它的計(jì)數(shù)器應(yīng)該為零。當(dāng)線程第一次獲得對(duì)象的鎖時(shí),計(jì)數(shù)器為一。線程每獲一次對(duì)象的鎖,計(jì)數(shù)器就加一。當(dāng)然,只有第一次獲得對(duì)象鎖的線程才能多次獲得鎖。線程每退出一個(gè)synchronized方法,計(jì)數(shù)器就減一。等減到零了,對(duì)象也就解鎖了,這時(shí)其它線程就可以使用這個(gè)對(duì)象了。

此外每個(gè)類還有一個(gè)鎖(它屬于類的Class對(duì)象),這樣當(dāng)類的synchronized static方法讀取static數(shù)據(jù)的時(shí)候,就不會(huì)相互干擾了。

Synchronized改寫EvenGenerator

只要用synchronized稍加處理,EvenGenerator.java的線程沖突問(wèn)題就能迎刃而解:

//: c13:SynchronizedEvenGenerator.java
// Using "synchronized" to prevent thread collisions
public
class SynchronizedEvenGenerator implements Invariant {
private int i;
public synchronized void next() { i++; i++; }
public synchronized int getValue() { return i; }
// Not synchronized so it can run at
// any time and thus be a genuine test:
public InvariantState invariant() {
int val = getValue();
if(val % 2 == 0)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
public static void main(String[] args) {
SynchronizedEvenGenerator gen =
new SynchronizedEvenGenerator();
new InvariantWatcher(gen, 4000); // 4-second timeout
while(true)
gen.next();
}
} ///:~

你會(huì)發(fā)現(xiàn)next( )getValue( )都是synchronized。如果你只對(duì)其中一個(gè)做synchronized,那么另一個(gè)就被忽略了,于是其它線程就能肆無(wú)忌憚地調(diào)用它了。一定要記住:所有訪問(wèn)共享資源的方法都必須是synchronized的,否則程序肯定會(huì)出錯(cuò)。而invariant( )(譯者注:原文為InvariantState,疑有誤)倒不是synchronized的,這是因?yàn)樗枪y(cè)試線程用的,因此我們希望它能隨時(shí)被調(diào)用,只有這樣才能算是真正的測(cè)試。

原子操作

"原子操作(atomic operation)是不需要synchronized",這是Java多線程編程的老生常談了。所謂原子操作是指不會(huì)被線程調(diào)度機(jī)制打斷的操作;這種操作一旦開始,就一直運(yùn)行倒結(jié)束,中間不會(huì)有任何context switch(切換到另一個(gè)線程)。

通常所說(shuō)的原子操作包括對(duì)非longdouble型的primitive進(jìn)行賦值,以及返回這兩者之外的primitive。之所以要把它們排除在外是因?yàn)樗鼈兌急容^大,而JVM的設(shè)計(jì)規(guī)范又沒(méi)有要求讀操作和賦值操作必須是原子操作(JVM可以試著去這么作,但并不保證)。不過(guò)如果你在longdouble前面加了volatile,那么它就肯定是原子操作了。

如果你一知半解地把這條規(guī)則用到SynchronizedEvenGenerator.java上,就會(huì)發(fā)覺(jué):

public synchronized int getValue() { return i; }

好像很符合原子操作的定義嘛。但是把synchronized去掉試試看,程序很快就出了錯(cuò)。這是因?yàn)?,雖然return i是原子操作,但刪掉synchronized之后,別的線程就能在它還處于不穩(wěn)定狀態(tài)的時(shí)候讀到它了。在做這種優(yōu)化之前,先得真正弄懂這么做的后果是什么。這里沒(méi)有現(xiàn)成的經(jīng)驗(yàn)。

下面,我們來(lái)看一個(gè)更簡(jiǎn)單的例子:一個(gè)返回序列號(hào)的類。[70]每次調(diào)用nextSerialNumber( )的時(shí)候,它都會(huì)返回一個(gè)唯一的值:

//: c13:SerialNumberGenerator.java
public class SerialNumberGenerator {
private static volatile int serialNumber = 0;
public static int nextSerialNumber() {
return serialNumber++;
}
} ///:~

SerialNumberGenerator大概是你能想到的最簡(jiǎn)單的例子了。如果你是從C++轉(zhuǎn)過(guò)來(lái)的,或者有其它低級(jí)語(yǔ)言的經(jīng)驗(yàn),你會(huì)認(rèn)為遞增肯定是一個(gè)原子操作,因?yàn)樗ǔ6际怯肅PU的指令來(lái)實(shí)現(xiàn)的。但是在JVM里,遞增不是原子操作,它涉及到了讀和寫。所以即便是這么簡(jiǎn)單的一個(gè)操作,多線程也有機(jī)可乘。

serialNumber字段是volatile的,這是因?yàn)槊總€(gè)線程都有一個(gè)保存變量副本的本地棧,如果你把變量定義為volatile的,那么編譯器就不會(huì)做任何優(yōu)化了。而優(yōu)化的意思就是減少數(shù)據(jù)同步的讀寫。

為了防止發(fā)現(xiàn)問(wèn)題的用時(shí)過(guò)長(zhǎng)所造成的數(shù)據(jù)量太大的問(wèn)題,我們得先準(zhǔn)備一個(gè)set。它重復(fù)利用了同一塊內(nèi)存來(lái)存儲(chǔ)int,因此不會(huì)有內(nèi)存不夠的問(wèn)題。這就是CircularSet。這么做的依據(jù)是,當(dāng)我們重新繞回來(lái)的時(shí)候,沖突的可能性已經(jīng)非常小了。為了防止線程沖突,我們把add( )contains( )方法做成synchronized的。

//: c13:SerialNumberChecker.java
// Operations that may seem safe are not,
// when threads are present.
// Reuses storage so we don‘t run out of memory:
class CircularSet {
private int[] array;
private int len;
private int index = 0;
public CircularSet(int size) {
array = new int[size];
len = size;
// Initialize to a value not produced
// by the SerialNumberGenerator:
for(int i = 0; i < size; i++)
array[i] = -1;
}
public synchronized void add(int i) {
array[index] = i;
// Wrap index and write over old elements:
index = ++index % len;
}
public synchronized boolean contains(int val) {
for(int i = 0; i < len; i++)
if(array[i] == val) return true;
return false;
}
}
public class SerialNumberChecker {
private static CircularSet serials =
new CircularSet(1000);
static class SerialChecker extends Thread {
SerialChecker() { start(); }
public void run() {
while(true) {
int serial =
SerialNumberGenerator.nextSerialNumber();
if(serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++)
new SerialChecker();
// Stop after 4 seconds:
new Timeout(4000, "No duplicates detected");
}
} ///:~

SerialNumberChecker包含一個(gè)保存著所有已經(jīng)被提取出來(lái)的序列號(hào)static CircularSet。此外它還內(nèi)嵌了一個(gè)會(huì)提取序列號(hào),并保證其唯一性的SerialChecker線程。我們創(chuàng)建了很多線程,因此問(wèn)題出現(xiàn)的還算比較快(在你的機(jī)器上,很可能跑很長(zhǎng)時(shí)間也不會(huì)出問(wèn)題,但是在多CPU的系統(tǒng)上,它確實(shí)沖突了)。要解決這個(gè)問(wèn)題,只要把synchronized關(guān)鍵詞加到nextSerialNumber( )前面就行了。

最安全的原子操作只有讀取和對(duì)primitive賦值這兩種。但是正如EvenGenerator.java所揭示的,原子操作也能訪問(wèn)正處于無(wú)效狀態(tài)的對(duì)象,所以絕對(duì)不能想當(dāng)然。我們一開頭就講了,longdouble型的操作不一定時(shí)原子操作(雖然有些JVM能保證longdouble也是原子操作,但是如果你真的用了這個(gè)特性的話,代碼就沒(méi)有可移植性了。)

最安全的做法還是遵循如下的方針:

  1. 如果你要synchronize類的一個(gè)方法,索性把所有的方法全都synchronize了。要判斷,哪個(gè)方法該synchronize,哪個(gè)方法可以不synchronize,通常是很難的,而且也沒(méi)什么把握。
  2. 刪除synchronized的時(shí)候要絕對(duì)小心。通常這么做是為了性能,但是synchronized的開銷在JDK1.3和1.4里已經(jīng)大為降低了。此外,只有在用profiler分析過(guò),確認(rèn)synchronized確實(shí)是瓶頸的前提下才能這么作。

修訂Semaphore類

現(xiàn)在,我們?cè)賮?lái)看看Semaphore.java??瓷先?,只要把這三個(gè)方法做成sychronized就能解決問(wèn)題了,就像這樣:

//: c13:SynchronizedSemaphore.java
// Colliding over shared resources
public class SynchronizedSemaphore extends Semaphore {
private volatile int semaphore = 0;
public synchronized boolean available() {
return semaphore == 0;
}
public synchronized void acquire() { ++semaphore; }
public synchronized void release() { --semaphore; }
public InvariantState invariant() {
int val = semaphore;
if(val == 0 || val == 1)
return new InvariantOK();
else
return new InvariantFailure(new Integer(val));
}
public static void main(String[] args) throws Exception {
SynchronizedSemaphore sem =new SynchronizedSemaphore();
new SemaphoreTester(sem);
new SemaphoreTester(sem);
new InvariantWatcher(sem).join();
}
} ///:~

乍看上去SynchronizedSemaphore實(shí)在是太怪了——它繼承自Semaphore,但是與原來(lái)的方法相比,覆寫的方法只是多了個(gè)synchronized。Java覆寫方法的時(shí)候是不允許修改方法的特征簽名的,但是這里為什么不會(huì)報(bào)錯(cuò)。這是因?yàn)?span id="eboekql" class=original_words>synchronized關(guān)鍵詞不屬于方法的特征簽名,所以你可以隨便加。

之所以要繼承Semaphore是為了能復(fù)用SemaphoreTester。但是等程序開始運(yùn)行,你還是會(huì)看到InvariantFailure。

為什么還是不行?這是因?yàn)椋€程要根據(jù)available( )的返回值來(lái)判斷Semaphore是不是可得,因此它得先釋放對(duì)象的鎖再知道它是不是可得。這時(shí)另一個(gè)線程沖了進(jìn)來(lái),搶在這個(gè)線程的前頭給Semaphore加了一。對(duì)此,第一個(gè)線程一無(wú)所知,在它看來(lái)Semaphore仍然是可得的,因此它會(huì)傻乎乎地調(diào)用acquire( ),從而將Semaphore置于無(wú)效狀態(tài)。又一個(gè)血淋淋的教訓(xùn)。所以千萬(wàn)要牢記并發(fā)編程的最高法則:絕對(duì)不能想當(dāng)然。

要解決這個(gè)問(wèn)題,唯一的辦法就是將可得性測(cè)試與獲取合并成一個(gè)原子操作——也就是對(duì)象鎖和synchronized關(guān)鍵詞所提供的效果。也就是說(shuō),對(duì)象鎖和synchronized關(guān)鍵詞是Java內(nèi)置的semaphore,因此沒(méi)必要再去搞一套了。

關(guān)鍵段

有時(shí)你只需要防止多個(gè)線程同時(shí)訪問(wèn)方法中的某一部分,而不是整個(gè)方法。這種需要隔離的代碼就被稱為關(guān)鍵段(critical section)。創(chuàng)建關(guān)鍵段需要用到synchronized關(guān)鍵詞。這里,synchronized的作用是,指明執(zhí)行下列代碼需獲得哪個(gè)對(duì)象的鎖。

synchronized(syncObject) {
// This code can be accessed 
// by only one thread at a time
}


關(guān)鍵段又被稱為"同步塊(synchronized block)";線程在執(zhí)段代碼之前,必須先獲得syncObject的鎖。如果其它線程已經(jīng)獲得這個(gè)鎖了,那么在它解鎖之前,線程不能運(yùn)行關(guān)鍵段中的代碼。

同步分兩種,代碼的同步和方法的同步。下面這段例程比較了這兩種方案。通過(guò)對(duì)比可以看出,相比同步整個(gè)方法,同步一段代碼能顯著增加其它線程獲得這個(gè)對(duì)象的機(jī)會(huì)。此外,它還演示了如何用wrapper使用和保護(hù)非線程安全的類:

//: c13:CriticalSection.java
// Synchronizing blocks instead of entire methods. Also
// demonstrates protection of a non-thread-safe class
// with a thread-safe one.
import java.util.*;
class Pair { // Not thread-safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() { this(0, 0); }
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString() {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException
extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Arbitrary invariant -- both variables must be equal:
public void checkState() {
if(x != y)
throw new PairValuesNotEqualException();
}
}
// Protect a Pair inside a thread-safe class:
abstract class PairManager {
protected Pair p = new Pair();
private List storage = new ArrayList();
public synchronized Pair getPair() {
// Make a copy to keep the original safe:
return new Pair(p.getX(), p.getY());
}
protected void store() { storage.add(getPair()); }
// A "template method":
public abstract void doTask();
}
// Synchronize the entire method:
class PairManager1 extends PairManager {
public synchronized void doTask() {
p.incrementX();
p.incrementY();
store();
}
}
// Use a critical section:
class PairManager2 extends PairManager {
public void doTask() {
synchronized(this) {
p.incrementX();
p.incrementY();
}
store();
}
}
class PairManipulator extends Thread {
private PairManager pm;
private int checkCounter = 0;
private class PairChecker extends Thread {
PairChecker() { start(); }
public void run() {
while(true) {
checkCounter++;
pm.getPair().checkState();
}
}
}
public PairManipulator(PairManager pm) {
this.pm = pm;
start();
new PairChecker();
}
public void run() {
while(true) {
pm.doTask();
}
}
public String toString() {
return "Pair: " + pm.getPair() +
" checkCounter = " + checkCounter;
}
}
public class CriticalSection {
public static void main(String[] args) {
// Test the two different approaches:
final PairManipulator
pm1 = new PairManipulator(new PairManager1()),
pm2 = new PairManipulator(new PairManager2());
new Timer(true).schedule(new TimerTask() {
public void run() {
System.out.println("pm1: " + pm1);
System.out.println("pm2: " + pm2);
System.exit(0);
}
}, 500); // run() after 500 milliseconds
}
} ///:~

你也看到了,Pair不是一個(gè)線程安全的類,它的invariant(我們隨便定了一個(gè))要求XY的值相同。此外,正如我們前面所看到的,遞增不是線程安全的操作,而這些方法也都不是synchronized的,所以在多線程環(huán)境下,Pair肯定會(huì)出問(wèn)題。

PariManager包含一個(gè)Pair對(duì)象,與此同時(shí)它還控制了外部對(duì)這個(gè)對(duì)象的所有訪問(wèn)。注意,它的兩個(gè)public方法,getPair( )abstract doTash( )都是synchronized。只是后者的同步化處理放到了實(shí)現(xiàn)里面。

PairManager的功能,有的是在abstract的基類里實(shí)現(xiàn)的,有的是在派生類里定義的。用設(shè)計(jì)模式的話來(lái)說(shuō),這就是是Template Method。[71]設(shè)計(jì)模式要求你把會(huì)變的代碼封裝起來(lái);這里會(huì)變的就是模板方法doTask( )。PairManager1把整個(gè)doTask( )同步化了,而PairManager2用關(guān)鍵段部分地同步化了這個(gè)方法。注意,synchronized關(guān)鍵詞不屬于方法的特征簽名,因此覆寫的時(shí)候可以加上去。

PairManager2值得關(guān)注。store( )是一個(gè)protected方法,也就是說(shuō)只有子類才能訪問(wèn),一般的客戶根本是看不到的。所以我們就無(wú)需再把它放到synchronized方法里了。這里它被置于關(guān)鍵段之外。

關(guān)鍵段必須要有對(duì)象來(lái)同步,最常見的就是這個(gè)方法自己所屬的對(duì)象:synchronized(this)。PairManager2就用了這個(gè)方法。這樣,當(dāng)關(guān)鍵段獲得對(duì)象鎖的時(shí)候,其他線程就不能調(diào)用這個(gè)對(duì)象的synchronized方法了。而這樣做的效果就是,大大縮小了同步的范圍。

有時(shí)你還會(huì)碰到要?jiǎng)?chuàng)建一個(gè)單獨(dú)的對(duì)象,然后對(duì)它進(jìn)行同步化的情形,這時(shí)這個(gè)方案就不合適了。下面我們來(lái)演示一下,怎樣讓對(duì)象的方法針對(duì)不同的鎖做同步化,這樣兩個(gè)線程就可以同時(shí)進(jìn)入同一個(gè)對(duì)象了:

//: c13:SyncObject.java
// Synchronizing on another object
import com.bruceeckel.simpletest.*;
class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {
System.out.println("Inside f()");
// Doesn‘t release lock:
try {
Thread.sleep(500);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Leaving f()");
}
public void g() {
synchronized(syncObject) {
System.out.println("Inside g()");
try {
Thread.sleep(500);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Leaving g()");
}
}
}
public class SyncObject {
private static Test monitor = new Test();
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run() {
ds.f();
}
}.start();
ds.g();
monitor.expect(new String[] {
"Inside g()",
"Inside f()",
"Leaving g()",
"Leaving f()"
}, Test.WAIT + Test.IGNORE_ORDER);
}
} ///:~

DualSyncf( )方法針對(duì)this作了同步(整個(gè)方法的同步),而g( )里面則有一個(gè)針對(duì)syncObject的同步段。所以這兩個(gè)同步是相互獨(dú)立的。這一點(diǎn)已經(jīng)在main( )里得到了證明。main( )啟動(dòng)了一個(gè)線程調(diào)用f( ),而它自己又調(diào)用了g( )??梢詮某绦虻妮敵隹吹竭@兩個(gè)方法在同時(shí)運(yùn)行,它們誰(shuí)也沒(méi)有在等另一個(gè)。

回到CriticalSection.java。為了測(cè)試這兩個(gè)PairManager,我們創(chuàng)建了一個(gè)PairManipulator。這個(gè)類會(huì)在線程里運(yùn)行doTask( ),然后在PairChecker內(nèi)部類的實(shí)例里運(yùn)行另一個(gè)線程(檢查invariant)。為了能知道究竟做了多少檢查,我們讓PairChecker每做一次檢查就遞增一次checkCountermain( )創(chuàng)建了兩個(gè)PairManipulator。它會(huì)讓這兩個(gè)線程運(yùn)行一段時(shí)間,等到Timer到期,它會(huì)執(zhí)行run( ),也就是打印這兩個(gè)PairManipulator,然后再退出。程序的輸出大致是這樣的:

pm1: Pair: x: 58892, y: 58892 checkCounter = 44974
pm2: Pair: x: 73153, y: 73153 checkCounter = 100535

雖然每次運(yùn)行的結(jié)果都不一樣,但是總的來(lái)看PairManager1.doTask( )的運(yùn)行次數(shù)不會(huì)多過(guò)PairManager2.doTask( )的,因?yàn)楹笳哂玫氖峭蕉?,因此訪問(wèn)的機(jī)會(huì)較多。通常這就是你選擇使用同步段的原因:讓線程有更多的機(jī)會(huì)訪問(wèn)(在保證安全的前提下)。

當(dāng)然,最后還是要靠程序員:所有訪問(wèn)共享資源的代碼都必須被包進(jìn)同步段里。

線程的狀態(tài)

線程的狀態(tài)可歸納為以下四種:

  1. New: 線程對(duì)象已經(jīng)創(chuàng)建完畢,但尚未啟動(dòng)(start),因此還不能運(yùn)行。
  2. Runnable: 處在這種狀態(tài)下的線程,只要分時(shí)機(jī)制分配給它CPU周期,它就能運(yùn)行。也就是說(shuō),具體到某個(gè)時(shí)點(diǎn),它可能正在運(yùn)行,也可能沒(méi)有運(yùn)行,但是輪到它運(yùn)行的時(shí)候,誰(shuí)都不能阻止它;它沒(méi)有dead,也沒(méi)有被阻塞。
  3. Dead: 要想中止線程,正常的做法是退出run( )。在Java 2以前,你也可以調(diào)用stop( ),不過(guò)現(xiàn)在不建議用這個(gè)辦法了,因?yàn)樗芸赡軙?huì)造成程序運(yùn)行狀態(tài)的不穩(wěn)定。此外還有一個(gè)destroy( )(不過(guò)它還沒(méi)有實(shí)現(xiàn),或許將來(lái)也不會(huì)了,也就是說(shuō)已經(jīng)被放棄了)。后面我們會(huì)講怎樣用其它辦法來(lái)實(shí)現(xiàn)stop( )的功能。
  4. Blocked: 就線程本身而言,它是可以運(yùn)行的,但是有什么別的原因在阻止它運(yùn)行。線程調(diào)度機(jī)制會(huì)直接跳過(guò)blocked的線程,根本不給它分配CPU的時(shí)間。除非它重新進(jìn)入runnable狀態(tài),否則什么都干不了。

進(jìn)入阻塞狀態(tài)

如果線程被阻塞了,那肯定是出了什么問(wèn)題。問(wèn)題可能有以下幾種:

  1. 你用sleep(milliseconds)方法叫線程休眠。在此期間,線程是不能運(yùn)行的。
  2. 你用wait( )方法把線程掛了起來(lái)。除非收到notify( )notifyAll( )消息,否則線程無(wú)法重新進(jìn)入runnable狀態(tài)。這部分內(nèi)容會(huì)在后面講。
  3. 線程在等I/O結(jié)束。
  4. 線程要調(diào)用另一個(gè)對(duì)象的synchronized方法,但是還沒(méi)有得到對(duì)象的鎖。

或許你還在舊代碼里看到過(guò)suspend( )resume( ),不過(guò)Java 2已經(jīng)放棄了這兩個(gè)方法(因?yàn)楹苋菀自斐伤梨i),所以這里就不作介紹了。

線程間的協(xié)作

理解了線程會(huì)相互沖突以及該如何防止這種沖突之后,下一步就該學(xué)習(xí)怎樣讓線程協(xié)同工作了。要做到這一點(diǎn),關(guān)鍵是要讓線程能相互"協(xié)商(handshaking)"。而這個(gè)任務(wù)要由Objectwait( )notify( )來(lái)完成。

waitnotify

首先要強(qiáng)調(diào),線程sleep( )的時(shí)候并不釋放對(duì)象的鎖,但是wait( )的時(shí)候卻會(huì)釋放對(duì)象的鎖。也就是說(shuō)在線程wait( )期間,別的線程可以調(diào)用它的synchronized方法。當(dāng)線程調(diào)用了某個(gè)對(duì)象wait( )方法之后,它就中止運(yùn)行并釋放那個(gè)對(duì)象鎖了。

Java有兩種wait( )。第一種需要一個(gè)以毫秒記的時(shí)間作參數(shù),它的意思和sleep( )一樣,都是:"暫停一段時(shí)間。"區(qū)別在于:

  1. wait( )會(huì)釋放對(duì)象的鎖。
  2. 除了時(shí)間到了,wait( )還可以用notify( )notifyAll( )來(lái)中止

第二種wait( )不需要任何參數(shù);它的用途更廣。線程調(diào)用了這種wait( )之后,會(huì)一直等下去,直到(有別的線程調(diào)用了這個(gè)對(duì)象的)notify( )notifyAll( )。

sleep( )屬于Thread不同,wait( ), notify( ), 和notifyAll( )是根Object的方法。雖然這樣做法(把專為多線程服務(wù)的方法放到通用的根類里面)看上去有些奇怪,但卻是必要的。因?yàn)樗鼈兯倏氐氖敲總€(gè)對(duì)象都會(huì)有的鎖。所以結(jié)論就是,你可以在類的synchronized方法里調(diào)用wait( ),至于它繼不繼承Thread,實(shí)沒(méi)實(shí)現(xiàn)Runnable已經(jīng)無(wú)所謂了。實(shí)際上你也只能在synchronized方法里或synchronized段里調(diào)用wait( ),notify( )notifyAll( )(sleep( )則沒(méi)有這個(gè)限制,因?yàn)樗粚?duì)鎖進(jìn)行操作)。如果你在非synchronized方法里調(diào)用了這些方法,程序還是可以編譯的,但是一運(yùn)行就會(huì)出一個(gè)IllegalMonitorStateException。這個(gè)異常帶著一個(gè)挺讓人費(fèi)解的"current thread not owner"消息。這個(gè)消息的意思是,如果線程想調(diào)用對(duì)象的wait( ), notify( ),或notifyAll( )方法,必須先"擁有"(得到)這個(gè)對(duì)象的鎖。

你可以讓另一個(gè)對(duì)象來(lái)操控這個(gè)對(duì)象的鎖。要想這么做,第一步是先獲取對(duì)象的鎖。比方說(shuō)要想調(diào)用對(duì)象xnotify( ),可以在xsynchronized段里:

synchronized(x) {
x.notify();
}

通常情況下,如果條件是由方法之外的其他力量所控制的(最常見的就是要由其他線程修改),那么你就應(yīng)該用wait( )。你總不希望閑著的時(shí)候還讓線程不停地作測(cè)試吧;這種"瞎忙活"是對(duì)CPU資源的巨大浪費(fèi)。所以wait( )能讓你在等待世道改變的同時(shí)讓線程休眠,當(dāng)(其他線程調(diào)用了對(duì)象的)notify( )notifyAll( )的時(shí)候,線程自會(huì)醒來(lái),然后檢查條件是不是改變了。所以說(shuō)wait( )提供了一種同步線程間的活動(dòng)的方法。

舉個(gè)例子,假設(shè)有這么一間飯店,它有一個(gè)廚師和一個(gè)服務(wù)員。服務(wù)員必須等廚師燒菜。廚師做完一道菜就通知服務(wù)員,服務(wù)員上完菜接著等。這是一個(gè)很精彩的線程合作的例子:廚師就是producer,而服務(wù)員就是consumer。下面我們用代碼來(lái)為這個(gè)故事建模:

//: c13:Restaurant.java
// The producer-consumer approach to thread cooperation.
import com.bruceeckel.simpletest.*;
class Order {
private static int i = 0;
private int count = i++;
public Order() {
if(count == 10) {
System.out.println("Out of food, closing");
System.exit(0);
}
}
public String toString() { return "Order " + count; }
}
class WaitPerson extends Thread {
private Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
start();
}
public void run() {
while(true) {
while(restaurant.order == null)
synchronized(this) {
try {
wait();
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(
"Waitperson got " + restaurant.order);
restaurant.order = null;
}
}
}
class Chef extends Thread {
private Restaurant restaurant;
private WaitPerson waitPerson;
public Chef(Restaurant r, WaitPerson w) {
restaurant = r;
waitPerson = w;
start();
}
public void run() {
while(true) {
if(restaurant.order == null) {
restaurant.order = new Order();
System.out.print("Order up! ");
synchronized(waitPerson) {
waitPerson.notify();
}
}
try {
sleep(100);
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public class Restaurant {
private static Test monitor = new Test();
Order order; // Package access
public static void main(String[] args) {
Restaurant restaurant = new Restaurant();
WaitPerson waitPerson = new WaitPerson(restaurant);
Chef chef = new Chef(restaurant, waitPerson);
monitor.expect(new String[] {
"Order up! Waitperson got Order 0",
"Order up! Waitperson got Order 1",
"Order up! Waitperson got Order 2",
"Order up! Waitperson got Order 3",
"Order up! Waitperson got Order 4",
"Order up! Waitperson got Order 5",
"Order up! Waitperson got Order 6",
"Order up! Waitperson got Order 7",
"Order up! Waitperson got Order 8",
"Order up! Waitperson got Order 9",
"Out of food, closing"
}, Test.WAIT);
}
} ///:~

Order是一個(gè)會(huì)自己計(jì)數(shù)的類,同時(shí)它還負(fù)責(zé)中止程序;數(shù)到10的時(shí)候,它會(huì)調(diào)用System.exit( )。

WaitPerson必須知道它是為哪家Restaurant服務(wù)的,所以他得從那家飯店的"帳臺(tái)",也就是restaurant.order,拿單子。WaitPersonrun( )調(diào)用了wait( )方法,所以除非Chefnotify( )叫醒他,否則他會(huì)一直等下去。這是個(gè)非常簡(jiǎn)單的程序,所以我們知道只會(huì)有一個(gè)線程在等WaitPerson的鎖:就是WaitPerson線程自己,所以notify( )是安全的。如果情況比較復(fù)雜,有很多線程在等一個(gè)對(duì)象的鎖,那么你可能就不知道該叫醒哪個(gè)線程了。遇到這種情況,可以用notifyAll( ),這個(gè)方法會(huì)叫醒所有在等這個(gè)對(duì)象的鎖的線程。這樣線程就能自行判斷本次喚醒是否與自己有關(guān)。

請(qǐng)注意,wait( )所在的while循環(huán),它的測(cè)試條件正是wait要等的東西。乍看上去真有點(diǎn)奇怪——如果你等的是訂單,那么醒過(guò)來(lái)之后肯定會(huì)有訂單,難道不是嗎?問(wèn)題在于在多線程環(huán)境下,還沒(méi)等WaitPerson醒過(guò)來(lái),訂單就可能被其他線程給搶走了。所以唯一安全的做法就是套用下面這個(gè)wait( )定式:

while(conditionIsNotMet)
wait( );

用管道進(jìn)行線程間的I/O操作

在很多情況下,線程也可以利用I/O來(lái)進(jìn)行通信。多線程類庫(kù)會(huì)提供一種"管道(pipes)"來(lái)實(shí)現(xiàn)線程間的I/O。對(duì)Java I/O類庫(kù)而言,這個(gè)類就是PipedWriter(可以讓線程往管道里寫數(shù)據(jù))和PipedReader(讓另一個(gè)線程從這個(gè)管道里讀數(shù)據(jù))。你可以把它理解成"producer-consumer"問(wèn)題的一個(gè)變型,而管道則提供了一個(gè)現(xiàn)成的解決方案。

下面我們來(lái)舉一個(gè)簡(jiǎn)單的例子。兩個(gè)線程利用管道來(lái)進(jìn)行通信:

//: c13:PipedIO.java
// Using pipes for inter-thread I/O
import java.io.*;
import java.util.*;
class Sender extends Thread {
private Random rand = new Random();
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
while(true) {
for(char c = ‘A‘; c <= ‘z‘; c++) {
try {
out.write(c);
sleep(rand.nextInt(500));
} catch(Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
class Receiver extends Thread {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while(true) {
// Blocks until characters are there:
System.out.println("Read: " + (char)in.read());
}
} catch(IOException e) {
throw new RuntimeException(e);
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
sender.start();
receiver.start();
new Timeout(4000, "Terminated");
}
} ///:~

SenderReceiver表示兩個(gè)需要相互通信才能完成任務(wù)的線程。SenderPipedWriter是一個(gè)獨(dú)立的對(duì)象;但是ReceiverPipedReader必須與構(gòu)造函數(shù)里的PipedWriter相關(guān)聯(lián)。Sender把數(shù)據(jù)寫入Writer,然后休眠一段時(shí)間。Receiver則不sleep( )wait( ),它read( )不到數(shù)據(jù)的時(shí)候,自動(dòng)就阻塞了。這樣一來(lái)你就能不用wait( )循環(huán)就獲得producer-consumer的效果了。

注意,main( )先創(chuàng)建了senderreceiver,再調(diào)用了start( )。如果你沒(méi)創(chuàng)建完對(duì)象就啟動(dòng)線程,那么管道在不同的平臺(tái)上的行為就有可能會(huì)不一致。

更復(fù)雜的協(xié)同

這里只講了最基本的協(xié)同方式(即通常籍由wait( ),notify( )/notifyAll( )來(lái)實(shí)現(xiàn)的producer-consumer模式)。它已經(jīng)能解決絕大多數(shù)的線程協(xié)同問(wèn)題了,但是在高級(jí)的教科書里還有很多更復(fù)雜協(xié)同方式(特別是本章末尾所推薦的,由Doug Lea編著的Concurrent Programming in Java, 2nd Edition)

死鎖

由于線程能被阻塞,更由于synchronized方法能阻止其它線程訪問(wèn)本對(duì)象,因此有可能會(huì)出現(xiàn)如下這種情況:線程一在等線程二(釋放某個(gè)對(duì)象),線程二又在等線程三,這樣依次排下去直到有個(gè)線程在等線程一。這樣就形成了一個(gè)環(huán),每個(gè)線程都在等對(duì)方釋放資源,而它們誰(shuí)都不能運(yùn)行。這就是所謂的死鎖(deadlock)。

如果程序一運(yùn)行就死鎖,那倒也簡(jiǎn)單了。你可以馬上著手解決這個(gè)問(wèn)題。但真正的麻煩在于,程序看上去能正常運(yùn)行,但是卻潛伏著會(huì)引起死鎖的隱患?;蛟S你認(rèn)為這里根本就不可能會(huì)有死鎖,而bug也就這樣潛伏下來(lái)了。直到有一天,讓某個(gè)用戶給撞上了(而且這種bug還很可能是不可重復(fù)的)。所以對(duì)并發(fā)編程來(lái)說(shuō),防止死鎖是設(shè)計(jì)階段的一個(gè)重要任務(wù)。

下面我們來(lái)看看由Dijkstra發(fā)現(xiàn)的經(jīng)典的死鎖場(chǎng)景:哲學(xué)家吃飯問(wèn)題。原版的故事里有五個(gè)哲學(xué)家(不過(guò)我們的例程里允許有任意數(shù)量)。這些哲學(xué)家們只做兩件事,思考和吃飯。他們思考的時(shí)候,不需要任何共享資源,但是吃飯的時(shí)候,就必須坐到餐桌旁。餐桌上的餐具是有限的。原版的故事里,餐具是叉子,吃飯的時(shí)候要用兩把叉子把面條從碗里撈出來(lái)。但是很明顯,把叉子換成筷子會(huì)更合理,所以:一個(gè)哲學(xué)家需要兩根筷子才能吃飯。

現(xiàn)在引入問(wèn)題的關(guān)鍵:這些哲學(xué)家很窮,只買得起五根筷子。他們坐成一圈,兩個(gè)人的中間放一根筷子。哲學(xué)家吃飯的時(shí)候必須同時(shí)得到左手邊和右手邊的筷子。如果他身邊的任何一位正在使用筷子,那他只有等著。

這個(gè)問(wèn)題之所以有趣就在于,它演示了這么一個(gè)程序,它看上去似乎能正常運(yùn)行,但是卻容易引起死鎖。你可以自己試試,用命令行參數(shù)調(diào)節(jié)哲學(xué)家的數(shù)量和思考的時(shí)間。如果有很多哲學(xué)家,而且/或者他們思考的時(shí)間很長(zhǎng),或許你永遠(yuǎn)也碰不到死鎖,但是死鎖的可能性總還是在的。默認(rèn)的命令行參數(shù)會(huì)讓它很快地死鎖:

//: c13:DiningPhilosophers.java
// Demonstrates how deadlock can be hidden in a program.
// {Args: 5 0 deadlock 4}
import java.util.*;
class Chopstick {
private static int counter = 0;
private int number = counter++;
public String toString() {
return "Chopstick " + number;
}
}
class Philosopher extends Thread {
private static Random rand = new Random();
private static int counter = 0;
private int number = counter++;
private Chopstick leftChopstick;
private Chopstick rightChopstick;
static int ponder = 0; // Package access
public Philosopher(Chopstick left, Chopstick right) {
leftChopstick = left;
rightChopstick = right;
start();
}
public void think() {
System.out.println(this + " thinking");
if(ponder > 0)
try {
sleep(rand.nextInt(ponder));
} catch(InterruptedException e) {
throw new RuntimeException(e);
}
}
public void eat() {
synchronized(leftChopstick) {
System.out.println(this + " has "
+ this.leftChopstick + " Waiting for "
+ this.rightChopstick);
synchronized(rightChopstick) {
System.out.println(this + " eating");
}
}
}
public String toString() {
return "Philosopher " + number;
}
public void run() {
while(true) {
think();
eat();
}
}
}
public class DiningPhilosophers {
public static void main(String[] args) {
if(args.length < 3) {
System.err.println("usage:\n" +
"java DiningPhilosophers numberOfPhilosophers " +
"ponderFactor deadlock timeout\n" +
"A nonzero ponderFactor will generate a random " +
"sleep time during think().\n" +
"If deadlock is not the string " +
"‘deadlock‘, the program will not deadlock.\n" +
"A nonzero timeout will stop the program after " +
"that number of seconds.");
System.exit(1);
}
Philosopher[] philosopher =
new Philosopher[Integer.parseInt(args[0])];
Philosopher.ponder = Integer.parseInt(args[1]);
Chopstick
left = new Chopstick(),
right = new Chopstick(),
first = left;
int i = 0;
while(i < philosopher.length - 1) {
philosopher[i++] =
new Philosopher(left, right);
left = right;
right = new Chopstick();
}
if(args[2].equals("deadlock"))
philosopher[i] = new Philosopher(left, first);
else // Swapping values prevents deadlock:
philosopher[i] = new Philosopher(first, left);
// Optionally break out of program:
if(args.length >= 4) {
int delay = Integer.parseInt(args[3]);
if(delay != 0)
new Timeout(delay * 1000, "Timed out");
}
}
} ///:~

ChopstickPhilosopher都包含一個(gè)能自動(dòng)遞增的static counter。每個(gè)Philosopher有兩個(gè)reference,一個(gè)表示左邊那根Chopstick,另一個(gè)表示右邊那根;Philosopher吃飯之前必須拿到這兩根筷子。

staticponder表示哲學(xué)家要花多長(zhǎng)時(shí)間思考。如果這個(gè)值非零,則think( )會(huì)用它來(lái)生成一個(gè)隨機(jī)的休眠時(shí)間。我們用這種方法證明,如果線程(也就是哲學(xué)家)花在其它事情上(思考)的時(shí)間多了,那么它們使用共享資源(筷子)的機(jī)會(huì)就少了,因而程序就不太容易死鎖了,但事實(shí)并非如此。

請(qǐng)看eat( )Philosophersynchronized 左邊那根筷子。如果得不到,他就等,這時(shí)他處于阻塞狀態(tài)。得到左邊那根筷子之后,他又用相同的方法去申請(qǐng)右邊的筷子。吃完之后,他也是先放左邊的,再放右邊的。

Philosopherrun( )里不停的思考和吃飯。

main( )至少需要三個(gè)參數(shù),如果沒(méi)給夠,它就打印使用的信息。第三個(gè)參數(shù)可以是"deadlock"字符串,這時(shí)程序會(huì)啟動(dòng)會(huì)引發(fā)死鎖的版本。任何其它字符串都會(huì)運(yùn)行其非死鎖的版本。最后一個(gè)參數(shù)(可選的)是一個(gè)時(shí)限,程序運(yùn)行這段時(shí)間之后(以秒為單位,不論是否死鎖都會(huì))都會(huì)退出。為了能自動(dòng)地進(jìn)行測(cè)試,這個(gè)時(shí)限參數(shù)是必須的。

除了創(chuàng)建Philosopher數(shù)組,設(shè)置ponder的值,main( )還創(chuàng)建了兩個(gè)Chopstick對(duì)象,第一個(gè)對(duì)象放在first變量里。除了最后一個(gè)對(duì)象,其他Philosopher的初始化過(guò)程都一樣:先新建一個(gè)Philosopher,把leftright Chopstick傳給他,然后把rightChopstick傳到left,再為right創(chuàng)建一個(gè)新的Chopstick。下一個(gè)Philosopher再繼續(xù)用這兩根筷子。

在會(huì)死鎖版本里,我們把先前存在first里的那根筷子放在最后那個(gè)Philosopher的左邊。由于最后一位Philosopher正好坐在第一位的旁邊,這樣他們就必須共享first筷子了。這種安排有可能會(huì)造成這種情況:每個(gè)哲學(xué)家手里都攥著一根筷子,他在等他旁邊那位放下手里的筷子,這樣他才能吃飯,但程序已經(jīng)死鎖了。

試著用各種命令行參數(shù)來(lái)運(yùn)行程序,看看程序會(huì)怎樣運(yùn)行,特別是要注意在哪些情況下程序不會(huì)死鎖。

在告訴你如何修補(bǔ)這個(gè)問(wèn)題之前,先了解一下只有在下述四個(gè)條件同時(shí)滿足的情況下,死鎖才會(huì)發(fā)生:

  1. 互斥:也許線程會(huì)用到很多資源,但其中至少要有一項(xiàng)是不能共享的。這里,一根筷子同一時(shí)刻只能供一個(gè)哲學(xué)家使用。
  2. 至少要有一個(gè)進(jìn)程會(huì)在占用一項(xiàng)資源的同時(shí)還在等另一項(xiàng)正被其它進(jìn)程所占用的資源。也就是說(shuō),要想讓死鎖發(fā)生,哲學(xué)家必須攥著一根筷子等另一根。
  3. (調(diào)度系統(tǒng)或其他進(jìn)程)不能從進(jìn)程里搶資源。所有進(jìn)程都必須正常的釋放資源。我們的哲學(xué)家都彬彬有禮,不會(huì)從他的鄰座手里搶筷子。
  4. 必需要有等待的環(huán)。一個(gè)進(jìn)程在一個(gè)已經(jīng)被另一進(jìn)程搶占了的資源,而那個(gè)進(jìn)程又在等另一個(gè)被第三個(gè)進(jìn)程搶占了的資源,以此類推,直到有個(gè)進(jìn)程正在等被第一個(gè)進(jìn)程搶占了的資源,這樣就形成了癱瘓性的阻塞了。這里,由于每個(gè)哲學(xué)家都是先左后右的拿筷子,所以有可能會(huì)造成等待的環(huán)。在例程中,我們修改了最后一位哲學(xué)家的構(gòu)造函數(shù),讓他先右后左地拿筷子,從而破解了死鎖。

由于死鎖要同時(shí)滿足這四個(gè)條件,所用只要去掉其中一個(gè)就能防止死鎖。對(duì)于這個(gè)程序,預(yù)防死鎖最簡(jiǎn)單的辦法是破掉第四條。之所以會(huì)死鎖,是因?yàn)槊總€(gè)哲學(xué)家都以固定的次序拿筷子:先左后右。因此就有可能會(huì)發(fā)生每個(gè)人的左手都捏著一根筷子,然后等右邊那根,這樣就形成了等待環(huán)了。如果初始化的時(shí)候讓最后那位哲學(xué)家先右后左的拿筷子,那他就不會(huì)再礙著他左邊那位去拿右邊的筷子了,因此等待環(huán)就被破了。這只是這個(gè)問(wèn)題的解決方法之一,你還可以用破解其它條件的辦法來(lái)解決這個(gè)問(wèn)題(要想學(xué)得更細(xì),可以去看更高級(jí)的教課書)。

Java語(yǔ)言沒(méi)有提供任何能預(yù)防死鎖的機(jī)制,所以只能靠你來(lái)設(shè)計(jì)了。對(duì)于那些排錯(cuò)的人來(lái)說(shuō),這可不是什么好消息。

停止線程的正確方法

為了降低死鎖的發(fā)生幾率,Java 2放棄了Threadstop( ),suspend( )resume( )方法。

之所以要放棄stop( )是因?yàn)?,它不?huì)釋放對(duì)象的鎖,因此如果對(duì)象正處于無(wú)效狀態(tài)(也就是被破壞了),其它線程就可能會(huì)看到并且修改它了。這個(gè)問(wèn)題的后果可能非常微秒,因此難以察覺(jué)。所以別再用stop( )了,相反你應(yīng)該設(shè)置一個(gè)旗標(biāo)(flag)來(lái)告訴線程什么時(shí)候該停止。下面是一個(gè)簡(jiǎn)單的例子:

//: c13:Stopping.java
// The safe way to stop a thread.
import java.util.*;
class CanStop extends Thread {
// Must be volatile:
private volatile boolean stop = false;
private int counter = 0;
public void run() {
while(!stop && counter < 10000) {
System.out.println(counter++);
}
if(stop)
System.out.println("Detected stop");
}
public void requestStop() { stop = true; }
}
public class Stopping {
public static void main(String[] args) {
final CanStop stoppable = new CanStop();
stoppable.start();
new Timer(true).schedule(new TimerTask() {
public void run() {
System.out.println("Requesting stop");
stoppable.requestStop();
}
}, 500); // run() after 500 milliseconds
}
} ///:~     

stop必須是volatile的,這樣才能確保run( )方法能看到它(否則它會(huì)使用本地的緩存值)。這個(gè)線程的"任務(wù)"是打印10,000個(gè)數(shù)字,所以當(dāng)counter >= 10000或有人要它停下來(lái)的時(shí)候,它就結(jié)束了。注意requestStop( )不是synchronized,因?yàn)?span id="yigechf" class=original_words>stop既是boolean(改成true是一個(gè)原子操作)又是volatile的。

main( )啟動(dòng)完CanStop之后設(shè)置了一個(gè)Timer,讓它過(guò)半秒自動(dòng)調(diào)用requestStop( )Timer的構(gòu)造函數(shù)里的true參數(shù)的任務(wù)是,把這個(gè)線程設(shè)成守護(hù)線程,這樣它就不會(huì)阻止程序退出了。

打斷受阻的線程

有時(shí)線程受阻之后就不能再做輪詢了,比如在等輸入,這時(shí)你就不能像前面那樣去查詢旗標(biāo)了。碰到這種情況,你可以用Thread.interrupt( )方法打斷受阻的線程:

//: c13:Interrupt.java
// Using interrupt() to break out of a blocked thread.
import java.util.*;
class Blocked extends Thread {
public Blocked() {
System.out.println("Starting Blocked");
start();
}
public void run() {
try {
synchronized(this) {
wait(); // Blocks
}
} catch(InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println("Exiting run()");
}
}
public class Interrupt {
static Blocked blocked = new Blocked();
public static void main(String[] args) {
new Timer(true).schedule(new TimerTask() {
public void run() {
System.out.println("Preparing to interrupt");
blocked.interrupt();
blocked = null; // to release it
}
}, 2000); // run() after 2000 milliseconds
}
} ///:~

Blocked.run( )里面的wait( )使線程受阻。Timer期滿之后,會(huì)調(diào)用blockedinterrupt( )方法。最后要把blocked的 reference設(shè)成null,這樣垃圾回收器才能把它給回收了(在我們這個(gè)程序里,這一步不是必須的,但對(duì)會(huì)運(yùn)行很長(zhǎng)時(shí)間的程序來(lái)說(shuō),這一步很重要)。

線程組

線程組是一個(gè)裝線程的容器(collection)。用Joshua Bloch[72],也就是負(fù)責(zé)修補(bǔ)和改進(jìn)JDK 1.2的Java容器類庫(kù)的那位Sun的軟件架構(gòu)師,的話來(lái)講,它的意義可以概括為:

"最好把線程組看成是一次不成功的實(shí)驗(yàn),或者就當(dāng)它根本不存在。"

如果你(和我一樣)曾經(jīng)在線程組上花了很多時(shí)間和精力,你就會(huì)覺(jué)得很奇怪,為什么Sun沒(méi)有在此之前就這個(gè)問(wèn)題發(fā)表過(guò)更多的官方申明呢(過(guò)去幾年里,Java方面的類似問(wèn)題還有很多)。對(duì)于這種問(wèn)題,諾貝爾經(jīng)濟(jì)學(xué)獎(jiǎng)得主Joseph Stiglitz有一條人生哲學(xué)可以解釋,[73] 這就是所謂的承諾升級(jí)理論:

"延續(xù)錯(cuò)誤的代價(jià)是別人付的,但是承認(rèn)錯(cuò)誤的代價(jià)是由你付的。"

線程組還剩一個(gè)小用途。如果組里的線程拋出一個(gè)沒(méi)有被(異常處理程序)捕捉到的異常,就會(huì)啟動(dòng)ThreadGroup.uncaughtException( )。而它會(huì)在標(biāo)準(zhǔn)錯(cuò)誤流上打印出棧的軌跡。要想修改這個(gè)行為,你必須覆寫這個(gè)方法。

總結(jié)

要懂得什么時(shí)候用什么時(shí)候用并發(fā),什么時(shí)候不用并發(fā),這點(diǎn)非常重要。使用并發(fā)的主要理由包括:要管理大量的任務(wù),讓它們同時(shí)運(yùn)行以提高系統(tǒng)的利用率(包括在多CPU上透明的分配負(fù)載);更合理的組織代碼;以及方便用戶。平衡負(fù)載的一個(gè)經(jīng)典案例是在等待I/O的同時(shí)做計(jì)算。方便用戶的經(jīng)典案例是在用戶下載大文件的時(shí)候監(jiān)控"stop"按鈕。

線程還有一個(gè)額外的好處,那就是它提供了"輕型"(100個(gè)指令級(jí)的)運(yùn)行環(huán)境(execution context)的切換,而進(jìn)程環(huán)境(process context)的切換則是"重型"的(數(shù)千個(gè)指令)。由于所有線程會(huì)共享進(jìn)程的內(nèi)存空間,所以輕型的環(huán)境切換只會(huì)改變程序執(zhí)行順序和本地變量。而重型的進(jìn)程環(huán)境切換則必須交換全部的內(nèi)存空間。

多線程的主要缺點(diǎn)包括:

  1. 等待共享資源的時(shí)候,運(yùn)行速度會(huì)慢下來(lái)。
  2. 線程管理需要額外的CPU開銷。
  3. 如果設(shè)計(jì)得不不合理,程序會(huì)變得異常負(fù)責(zé)。
  4. 會(huì)引發(fā)一些不正常的狀態(tài),像饑餓(starving),競(jìng)爭(zhēng)(racing),死鎖(deadlock),活鎖(livelock)。
  5. 不同平臺(tái)上會(huì)有一些不一致。比如我在開發(fā)本書例程時(shí)發(fā)現(xiàn),在有些平臺(tái)下競(jìng)爭(zhēng)很快就出現(xiàn),但是換了臺(tái)機(jī)器,它根本就不出現(xiàn)。如果你在后者搞開發(fā),然后發(fā)布到前者,那可就慘了。

線程的難點(diǎn)在于多個(gè)線程會(huì)共享同一項(xiàng)資源——比如對(duì)象的內(nèi)存——而你又必須確保同一時(shí)刻不會(huì)有兩個(gè)或兩個(gè)以上的線程去訪問(wèn)那項(xiàng)資源。這就需要合理地使用synchronized關(guān)鍵詞了,但是用之前必須完全理解,否則它會(huì)悄悄地地把死鎖了帶進(jìn)來(lái)。

此外線程的運(yùn)用方面還有某種藝術(shù)。Java的設(shè)計(jì)思想是,讓你能根據(jù)需要?jiǎng)?chuàng)建任意多的對(duì)象來(lái)解決問(wèn)題,至少理論上如此。(對(duì)Java來(lái)說(shuō)創(chuàng)建數(shù)以百萬(wàn)計(jì)的對(duì)象,比如工程方面的有限元分析,還不太現(xiàn)實(shí)。)但是你能創(chuàng)建的線程數(shù)量應(yīng)該還是有一個(gè)上限的,因?yàn)榈搅诉@個(gè)數(shù)量,線程就僵掉了。這個(gè)臨界點(diǎn)很難找,通常由OS和JVM決定;或許是一百以內(nèi),也可能是幾千。不過(guò)通常你只需創(chuàng)建幾個(gè)線程就能解決問(wèn)題了,所以這還不算是什么限制;但是對(duì)于更為通用的設(shè)計(jì),這就是一個(gè)限制了。

線程方面一個(gè)重要,但卻不那么直觀的結(jié)論。那就是,通常你可以在run( )的主循環(huán)里插上yield( ),然后讓線程調(diào)度機(jī)制幫你加快程序的運(yùn)行。這絕對(duì)是一種藝術(shù),特別是當(dāng)?shù)却娱L(zhǎng)之后,性能卻上升了。之所以會(huì)這樣是因?yàn)?,較短的延遲會(huì)使正在運(yùn)行的線程還沒(méi)準(zhǔn)備好休眠就收到休眠結(jié)束的信號(hào),這樣為了能讓線程干完工作之后再休眠,調(diào)度機(jī)制不得不先把它停下來(lái)再喚醒它。額外的運(yùn)行環(huán)境的切換會(huì)導(dǎo)致運(yùn)行速度的下降,而yield( )sleep( )則可以防止這種多余的切換。要理解這個(gè)問(wèn)題有多麻煩還真得好好想想。

要想更深入地探討多線程問(wèn)題,推薦你看Concurrent Programming in Java, 2nd Edition,作者Doug Lea, Addison-Wesley, 2000出版。


[68] Runnable在Java 1.0里就有了,而內(nèi)部類是Java 1.1新加的。這一點(diǎn)部分地解釋了為什么會(huì)有Runnable。此外,傳統(tǒng)的多線程架構(gòu)較少涉及對(duì)象,它關(guān)注的主要是函數(shù)該如何運(yùn)行。就我個(gè)人的習(xí)慣,只要有機(jī)會(huì)就繼承Thread,我覺(jué)得這么做更有條理,靈活性也高。

[69] 有些例子是在雙CPU的Win2K系統(tǒng)上開發(fā)的,所以沖突很快就產(chǎn)生了。但是在單CPU的機(jī)器上,很可能要等很長(zhǎng)時(shí)間才會(huì)有沖突——多線程之所以會(huì)這么難,就是因?yàn)樗?jīng)常會(huì)有這種讓人很惱火的問(wèn)題。試想你在單CPU的機(jī)器上開發(fā)了多線程的程序,測(cè)試下來(lái)一切正常,但是當(dāng)你把程序部署到多CPU系統(tǒng)時(shí),一運(yùn)行它就崩潰了。

[70] 受Joshua Bloch的Effective Java, Addison-Wesley 2001,190頁(yè)的啟發(fā)。

[71] 參見由Gamma等著的Design Patterns, Addison-Wesley 1995.

[72] Effective Java,作者Joshua Bloch, Addison-Wesley 2001, 第 211頁(yè)。

[73] 在Java的發(fā)展史上,類似的項(xiàng)目還有很多。為什么要半途而廢?——我曾經(jīng)不止一次地問(wèn)過(guò)這個(gè)問(wèn)題??磥?lái)這就是答案。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多

    日韩av亚洲一区二区三区| 国产亚洲神马午夜福利| 深夜视频在线观看免费你懂| 高清在线精品一区二区| 亚洲最新中文字幕在线视频| 真实偷拍一区二区免费视频| 日本午夜免费福利视频| 午夜精品一区二区三区国产| 国产精品视频第一第二区| 欧美精品久久99九九| 中文字幕高清不卡一区| 激情三级在线观看视频| 手机在线不卡国产视频| 国产精品内射婷婷一级二级 | 懂色一区二区三区四区| 中文字幕乱子论一区二区三区| 又色又爽又无遮挡的视频| 亚洲高清一区二区高清| 99久久国产精品亚洲| 精品少妇人妻av一区二区蜜桃| 国内真实露脸偷拍视频| 国产偷拍精品在线视频| 亚洲最新的黄色录像在线| 亚洲妇女作爱一区二区三区| 欧美日韩最近中国黄片| 日本精品理论在线观看| 国产精品欧美一区两区| 好吊视频一区二区在线| 亚洲视频一级二级三级| 日韩在线视频精品中文字幕| 出差被公高潮久久中文字幕| 亚洲一区二区三区福利视频| 婷婷激情四射在线观看视频| 一区中文字幕人妻少妇 | 中文文精品字幕一区二区| 中文字幕一区二区熟女| 亚洲av日韩一区二区三区四区 | 国产成人亚洲精品青草天美| 日本午夜免费观看视频| 正在播放国产又粗又长| 国产三级黄片在线免费看|