一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

分布式基礎(chǔ)學(xué)習(xí)【二】 —— 分布式計算系統(tǒng)(Map/Reduce)

 gerial 2011-11-23

分布式基礎(chǔ)學(xué)習(xí)【二】 —— 分布式計算系統(tǒng)(Map/Reduce)

二. 分布式計算(Map/Reduce)

分布式式計算,同樣是一個寬泛的概念,在這里,它狹義的指代,按Google Map/Reduce框架所設(shè)計的分布式框架。在Hadoop中,分布式文件系統(tǒng),很大程度上,是為各種分布式計算需求所服務(wù)的。我們說分布式文件系統(tǒng)就是加了分布式的文件系統(tǒng),類似的定義推廣到分布式計算上,我們可以將其視為增加了分布式支持的計算函數(shù)。 從計算的角度上看,Map/Reduce框架接受各種格式的鍵值對文件作為輸入,讀取計算后,最終生成自定義格式的輸出文件。而從分布式的角度上看,分布 式計算的輸入文件往往規(guī)模巨大,且分布在多個機器上,單機計算完全不可支撐且效率低下,因此Map/Reduce框架需要提供一套機制,將此計算擴展到無 限規(guī)模的機器集群上進(jìn)行。依照這樣的定義,我們對整個Map/Reduce的理解,也可以分別沿著這兩個流程去看。。。
在Map/Reduce框架中,每一次計算請求,被稱為作業(yè)。在分布式計算Map/Reduce框架中,為了完成這個作業(yè),它進(jìn)行兩步走的戰(zhàn)略,首先是將其拆分成若干個Map任務(wù), 分配到不同的機器上去執(zhí)行,每一個Map任務(wù)拿輸入文件的一部分作為自己的輸入,經(jīng)過一些計算,生成某種格式的中間文件,這種格式,與最終所需的文件格式 完全一致,但是僅僅包含一部分?jǐn)?shù)據(jù)。因此,等到所有Map任務(wù)完成后,它會進(jìn)入下一個步驟,用以合并這些中間文件獲得最后的輸出文件。此時,系統(tǒng)會生成若 干個Reduce任務(wù),同樣也是分配到不同的機器去執(zhí)行,它的目標(biāo),就是將若干個Map任務(wù)生成的中間文件為匯總到最后的輸出文件中去。當(dāng)然,這個匯總不總會像1 + 1 = 2那么直接了當(dāng),這也就是Reduce任務(wù)的價值所在。經(jīng)過如上步驟,最終,作業(yè)完成,所需的目標(biāo)文件生成。整個算法的關(guān)鍵,就在于增加了一個中間文件生成的流程,大大提高了靈活性,使其分布式擴展性得到了保證。。。

I. 術(shù)語對照

和分布式文件系統(tǒng)一樣,Google、Hadoop和....我,各執(zhí)一種方式表述統(tǒng)一概念,為了保證其統(tǒng)一性,特有下表。。。

文中翻譯 Hadoop術(shù)語 Google術(shù)語 相關(guān)解釋
作業(yè) Job Job 用戶的每一個計算請求,就稱為一個作業(yè)。
作業(yè)服務(wù)器 JobTracker Master 用戶提交作業(yè)的服務(wù)器,同時,它還負(fù)責(zé)各個作業(yè)任務(wù)的分配,管理所有的任務(wù)服務(wù)器。
任務(wù)服務(wù)器 TaskTracker Worker 任勞任怨的工蜂,負(fù)責(zé)執(zhí)行具體的任務(wù)。
任務(wù) Task Task 每一個作業(yè),都需要拆分開了,交由多個服務(wù)器來完成,拆分出來的執(zhí)行單位,就稱為任務(wù)。
備份任務(wù) Speculative Task Buckup Task 每一個任務(wù),都有可能執(zhí)行失敗或者緩慢,為了降低為此付出的代價,系統(tǒng)會未雨綢繆的實現(xiàn)在另外的任務(wù)服務(wù)器上執(zhí)行同樣一個任務(wù),這就是備份任務(wù)。

II. 基本架構(gòu)

與分布式文件系統(tǒng)類似,Map/Reduce的集群,也由三類服務(wù)器構(gòu)成。其中作業(yè)服務(wù)器,在Hadoop中稱為Job Tracker,在Google論文中稱為Master。前者告訴我們,作業(yè)服務(wù)器是負(fù)責(zé)管理運行在此框架下所有作業(yè)的,后者告訴我們,它也是為各個作業(yè)分配任務(wù)的核心。與HDFS的主控服務(wù)器類似,它也是作為單點存在的,簡化了負(fù)責(zé)的同步流程。具體的負(fù)責(zé)執(zhí)行用戶定義操作的,是任務(wù)服務(wù)器,每一個作業(yè)被拆分成很多的任務(wù),包括Map任務(wù)Reduce任務(wù)等,任務(wù)是具體執(zhí)行的基本單元,它們都需要分配到合適任務(wù)服務(wù)器上去執(zhí)行,任務(wù)服務(wù)器一邊執(zhí)行一邊向作業(yè)服務(wù)器匯報各個任務(wù)的狀態(tài),以此來幫助作業(yè)服務(wù)器了解作業(yè)執(zhí)行的整體情況,分配新的任務(wù)等等。。。
除了作業(yè)的管理者執(zhí)行者,還需要有一個任務(wù)的提交者,這就是客戶端。與分布式文件系統(tǒng)一樣,客戶端也不是一個單獨的進(jìn)程,而是一組API,用戶需要自定義好自己需要的內(nèi)容,經(jīng)由客戶端相關(guān)的代碼,將作業(yè)及其相關(guān)內(nèi)容和配置,提交到作業(yè)服務(wù)器去,并時刻監(jiān)控執(zhí)行的狀況。。。
同作為Hadoop的實現(xiàn),與HDFS的通信機制相同,Hadoop Map/Reduce也是用了協(xié)議接口來進(jìn)行服務(wù)器間的交流。實現(xiàn)者作為RPC服務(wù)器,調(diào)用者經(jīng)由RPC的代理進(jìn)行調(diào)用,如此,完成大部分的通信,具體服 務(wù)器的架構(gòu),和其中運行的各個協(xié)議狀況,參見下圖。從圖中可以看到,與HDFS相比,相關(guān)的協(xié)議少了幾個,客戶端與任務(wù)服務(wù)器,任務(wù)服務(wù)器之間,都不再有 直接通信關(guān)系。這并不意味著客戶端就不需要了解具體任務(wù)的執(zhí)行狀況,也不意味著,任務(wù)服務(wù)器之間不需要了解別家任務(wù)執(zhí)行的情形,只不過,由于整個集群各機 器的聯(lián)系比HDFS復(fù)雜的多,直接通信過于的難以維系,所以,都統(tǒng)一由作業(yè)服務(wù)器整理轉(zhuǎn)發(fā)。另外,從這幅圖可以看到,任務(wù)服務(wù)器不是一個人在戰(zhàn)斗,它會像 孫悟空一樣招出一群寶寶幫助其具體執(zhí)行任務(wù)。這樣做的好處,個人覺得,應(yīng)該有安全性方面的考慮,畢竟,任務(wù)的代碼是用戶提交的,數(shù)據(jù)也是用戶指定的,這質(zhì) 量自然良莠不齊,萬一碰上個搞破壞的,把整個任務(wù)服務(wù)器進(jìn)程搞死了,就因小失大了。因此,放在單獨的地盤進(jìn)行,愛咋咋地,也算是權(quán)責(zé)明確了。。。
與分布式文件系統(tǒng)相比,Map/Reduce框架的還有一個特點,就是可定制性強。文件系統(tǒng)中很多的算法, 都是很固定和直觀的,不會由于所存儲的內(nèi)容不同而有太多的變化。而作為通用的計算框架,需要面對的問題則要復(fù)雜很多,在各種不同的問題、不同的輸入、不同 的需求之間,很難有一種包治百病的藥能夠一招鮮吃遍天。作為Map/Reduce框架而言,一方面要盡可能的抽取出公共的一些需求,實現(xiàn)出來。更重要的, 是需要提供良好的可擴展機制,滿足用戶自定義各種算法的需求。Hadoop是由Java來實現(xiàn)的,因此通過反射來實現(xiàn)自定義的擴展,顯得比較小菜一碟了。 在JobConf類中,定義了大量的接口,這基本上是Hadoop Map/Reduce框架所有可定制內(nèi)容的一次集中展示。在JobConf中,有大量set接口接受一個Class<? extends xxx>的參數(shù),通常它都有一個默認(rèn)實現(xiàn)的類,用戶如果不滿意,則可自定義實現(xiàn)。。。

III. 計算流程

如果一切都按部就班的進(jìn)行,那么整個作業(yè)的計算流程,應(yīng)該是作業(yè)的提交 -> Map任務(wù)的分配和執(zhí)行 -> Reduce任務(wù)的分配和執(zhí)行 -> 作業(yè)的完成。而在每個任務(wù)的執(zhí)行中,又包含輸入的準(zhǔn)備 -> 算法的執(zhí)行 -> 輸出的生成,三個子步驟。沿著這個流程,我們可以很快的整理清晰整個Map/Reduce框架下作業(yè)的執(zhí)行。。。

1、作業(yè)的提交

一個作業(yè),在提交之前,需要把所有應(yīng)該配置的東西都配置好,因為一旦提交到了作業(yè)服務(wù)器上,就陷入了完全自動化的流程,用戶除了觀望,最多也就能起一個監(jiān)督作用,懲治一些不好好工作的任務(wù)。。。
基本上,用戶在提交代碼階段,需要做的工作主要是這樣的:
首先,書寫好所有自定的代碼,最起碼,需要有Map和Reduce的執(zhí)行代碼。在Hadoop中,Map需要派生自Mapper<K1, V1, K2, V2>接口,Reduce需要派生自Reducer<K2, V2, K3, V3>接口。這里都是用的泛型,用以支持不同的鍵值類型。這兩個接口都僅有一個方法,一個是map,一個是reduce,這兩個方法都直接受四個參數(shù),前兩個是輸入的相關(guān)的數(shù)據(jù)結(jié)構(gòu),第三個是作為輸出相關(guān)的數(shù)據(jù)結(jié)構(gòu),最后一個,是一個Reporter類的實例,實現(xiàn)的時候可以利用它來統(tǒng)計一些計數(shù)。除了這兩個接口,還有大量可以派生的接口,比如分割的Partitioner<K2, V2>接口。。。
然后,需要書寫好主函數(shù)的代碼,其中最主要的內(nèi)容就是實例化一個JobConf類的對象,然后調(diào)用其豐富的setXXX接口,設(shè)定好所需的內(nèi)容,包括輸入輸出的文件路徑,Map和Reduce的類,甚至包括讀取寫入文件所需的格式支持類,等等。。。
最后,調(diào)用JobClientrunJob方法,提交此JobConf對象。runJob方法會先行調(diào)用到JobSubmissionProtocol接口所定義的submitJob方法,將此作業(yè),提交給作業(yè)服務(wù)器。接著,runJob開始循環(huán),不停的調(diào)用JobSubmissionProtocol的getTaskCompletionEvents方法,獲得TaskCompletionEvent類的對象實例,了解此作業(yè)各任務(wù)的執(zhí)行狀況。。。

2、Map任務(wù)的分配

當(dāng)一個作業(yè)提交到了作業(yè)服務(wù)器上,作業(yè)服務(wù)器會生成若干個Map任務(wù),每一個Map任務(wù),負(fù)責(zé)將一部分的輸入轉(zhuǎn)換成格式與最終格式相同的中間文件。通常一個作業(yè)的輸入都是基于分布式文件系統(tǒng)的文件(當(dāng)然在單機環(huán)境下,文件系統(tǒng)單機的也可以...),因為,它可以很天然的和分布式的計算產(chǎn)生聯(lián)系。而對于一個Map任務(wù)而言,它的輸入往往是輸入文件的一個數(shù)據(jù)塊,或者是數(shù)據(jù)塊的一部分,但通常,不跨數(shù)據(jù)塊。因為,一旦跨了數(shù)據(jù)塊,就可能涉及到多個服務(wù)器,帶來了不必要的復(fù)雜性。。。
當(dāng)一個作業(yè),從客戶端提交到了作業(yè)服務(wù)器上,作業(yè)服務(wù)器會生成一個JobInProgress對象,作為與 之對應(yīng)的標(biāo)識,用于管理。作業(yè)被拆分成若干個Map任務(wù)后,會預(yù)先掛在作業(yè)服務(wù)器上的任務(wù)服務(wù)器拓?fù)錁?。這是依照分布式文件數(shù)據(jù)塊的位置來劃分的,比如一 個Map任務(wù)需要用某個數(shù)據(jù)塊,這個數(shù)據(jù)塊有三份備份,那么,在這三臺服務(wù)器上都會掛上此任務(wù),可以視為是一個預(yù)分配。。。
關(guān)于任務(wù)管理和分配的大部分的真實功能和邏輯的實現(xiàn),JobInProgress則依托JobInProgressListenerTaskScheduler的子類。TaskScheduler,顧名思義是用于任務(wù)分配的策略類(為了簡化描述,用它代指所有TaskScheduler的子類...)。它會掌握好所有作業(yè)的任務(wù)信息,其assignTasks函數(shù),接受一個TaskTrackerStatus作為參數(shù),依照此任務(wù)服務(wù)器的狀態(tài)和現(xiàn)有的任務(wù)狀況,為其分配新的任務(wù)。而為了掌握所有作業(yè)相關(guān)任務(wù)的狀況,TaskScheduler會將若干個JobInProgressListener注冊到JobTracker中去,當(dāng)有新的作業(yè)到達(dá)、移除或更新的時候,JobTracker會告知給所有的JobInProgressListener,以便它們做出相應(yīng)的處理。。。
任務(wù)分配是一個重要的環(huán)節(jié),所謂任務(wù)分配,就是將合適作業(yè)的合適任務(wù)分配到合適的服務(wù)器上。不難看出,里面 蘊含了兩個步驟,先是選擇作業(yè),然后是在此作業(yè)中選擇任務(wù)。和所有分配工作一樣,任務(wù)分配也是一個復(fù)雜的活。不良好的任務(wù)分配,可能會導(dǎo)致網(wǎng)絡(luò)流量增加、 某些任務(wù)服務(wù)器負(fù)載過重效率下降,等等。不僅如此,任務(wù)分配還是一個無一致模式的問題,不同的業(yè)務(wù)背景,可能需要不同的算法才能滿足需求。因此,在 Hadoop中,有很多TaskScheduler的子類,像Facebook,Yahoo,都為其貢獻(xiàn)出了自家用的算法。在Hadoop中,默認(rèn)的任務(wù) 分配器,是JobQueueTaskScheduler類。它選擇作業(yè)的基本次序是:Map Clean Up Task(Map任務(wù)服務(wù)器的清理任務(wù),用于清理相關(guān)的過期的文件和環(huán)境...) -> Map Setup Task(Map任務(wù)服務(wù)器的安裝任務(wù),負(fù)責(zé)配置好相關(guān)的環(huán)境...) -> Map Tasks -> Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在這個前提下,具體到Map任務(wù)的分配上來。當(dāng)一個任務(wù)服務(wù)器工作的游刃有余,期待獲得新的任務(wù)的時候,JobQueueTaskScheduler會按照各個作業(yè)的優(yōu)先級,從最高優(yōu)先級的作業(yè)開 始分配。每分配一個,還會為其留出余量,已被不時之需。舉一個例子:系統(tǒng)目前有優(yōu)先級3、2、1的三個作業(yè),每個作業(yè)都有一個可分配的Map任務(wù),一個任 務(wù)服務(wù)器來申請新的任務(wù),它還有能力承載3個任務(wù)的執(zhí)行,JobQueueTaskScheduler會先從優(yōu)先級3的作業(yè)上取一個任務(wù)分配給它,然后再 留出一個1任務(wù)的余量。此時,系統(tǒng)只能在將優(yōu)先級2作業(yè)的任務(wù)分配給此服務(wù)器,而不能分配優(yōu)先級1的任務(wù)。這樣的策略,基本思路就是一切為高優(yōu)先級的作業(yè)服務(wù),優(yōu)先分配不說,分配了好保留有余力以備不時之需,如此優(yōu)待,足以讓高優(yōu)先級的作業(yè)喜極而泣,讓低優(yōu)先級的作業(yè)感慨既生瑜何生亮,甚至是活活餓死。。。
確定了從哪個作業(yè)提取任務(wù)后,具體的分配算法,經(jīng)過一系列的調(diào)用,最后實際是由JobInProgressfindNewMapTask函數(shù)完成的。它的算法很簡單,就是盡全力為此服務(wù)器非配且盡可能好的分配任務(wù), 也就是說,只要還有可分配的任務(wù),就一定會分給它,而不考慮后來者。作業(yè)服務(wù)器會從離它最近的服務(wù)器開始,看上面是否還掛著未分配的任務(wù)(預(yù)分配上的), 從近到遠(yuǎn),如果所有的任務(wù)都分配了,那么看有沒有開啟多次執(zhí)行,如果開啟,考慮把未完成的任務(wù)再分配一次(后面有地方詳述...)。。。
對于作業(yè)服務(wù)器來說,把一個任務(wù)分配出去了,并不意味著它就徹底解放,可以對此任務(wù)可以不管不顧了。因為任務(wù)可以在任務(wù)服務(wù)器上執(zhí)行失敗,可能執(zhí)行緩慢,這都需要作業(yè)服務(wù)器幫助它們再來一次。因此在Task中,記錄有一個TaskAttemptID,對于任務(wù)服務(wù)器而言,它們每次跑的,其實都只是一個Attempt而已,Reduce任務(wù)只需要采信一個的輸出,其他都算白忙乎了。。。

3、Map任務(wù)的執(zhí)行

與HDFS類似,任務(wù)服務(wù)器是通過心跳消息,向作業(yè)服務(wù)器匯報此時此刻其上各個任務(wù)執(zhí)行的狀況,并向作業(yè)服務(wù)器申請新的任務(wù)的。具體實現(xiàn),是TaskTracker調(diào)用InterTrackerProtocol協(xié)議的heartbeat方法來做的。這個方法接受一個TaskTrackerStatus對象作為參數(shù),它描述了此時此任務(wù)服務(wù)器的狀態(tài)。當(dāng)其有余力接受新的任務(wù)的時候,它還會傳入acceptNewTasks為true的參數(shù),表示希望作業(yè)服務(wù)器委以重任。JobTracker接收到相關(guān)的參數(shù)后,經(jīng)過處理,會返回一個HeartbeatResponse對象。這個對象中,定義了一組TaskTrackerAction,用于指導(dǎo)任務(wù)服務(wù)器進(jìn)行下一步的工作。系統(tǒng)中已定義的了一堆其TaskTrackerAction的子類,有的對攜帶的參數(shù)進(jìn)行了擴充,有的只是標(biāo)明了下ID,具體不詳寫了,一看便知。。。
當(dāng)TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它會開始執(zhí)行所分配的新的任務(wù)。在TaskTracker中,有一個TaskTracker.TaskLauncher線程(確切的說是兩個,一個等Map任務(wù),一個等Reduce任務(wù)),它們在癡癡的守候著新任務(wù)的來到。一旦等到了,會最終調(diào)用到Task的createRunner方法,構(gòu)造出一個TaskRunner對象,新建一個線程來執(zhí)行。對于一個Map任務(wù),它對應(yīng)的Runner是TaskRunner的子類MapTaskRunner, 不過,核心部分都在TaskRunner的實現(xiàn)內(nèi)。TaskRunner會先將所需的文件全部下載并拆包好,并記錄到一個全局緩存中,這是一個全局的目 錄,可以供所有此作業(yè)的所有任務(wù)使用。它會用一些軟鏈接,將一些文件名鏈接到這個緩存中來。然后,根據(jù)不同的參數(shù),配置出一個JVM執(zhí)行的環(huán)境,這個環(huán)境 與JvmEnv類的對象對應(yīng)。
接著,TaskRunner會調(diào)用JvmManagerlaunchJvm方 法,提交給JvmManager處理。JvmManager用于管理該TaskTracker上所有運行的Task子進(jìn)程。在目前的實現(xiàn)中,嘗試的是池化 的方式。有若干個固定的槽,如果槽沒有滿,那么就啟動新的子進(jìn)程,否則,就尋找idle的進(jìn)程,如果是同Job的直接放進(jìn)去,否則殺死這個進(jìn)程,用一個新 的進(jìn)程代替。每一個進(jìn)程都是由JvmRunner來管理的,它也是位于單獨線程中的。但是從實現(xiàn)上看,這個機制好像沒有部署開,子進(jìn)程是死循環(huán)等待,而不 會阻塞在父進(jìn)程的相關(guān)線程上,父線程的變量一直都沒有個調(diào)整,一旦分配,始終都處在繁忙的狀況了。
真實的執(zhí)行載體,是Child,它包含一個 main函數(shù),進(jìn)程執(zhí)行,會將相關(guān)參數(shù)傳進(jìn)來,它會拆解這些參數(shù),并且構(gòu)造出相關(guān)的Task實例,調(diào)用其run函數(shù)進(jìn)行執(zhí)行。每一個子進(jìn)程,可以執(zhí)行指定 個數(shù)量的Task,這就是上面所說的池化的配置。但是,這套機制在我看來,并沒有運行起來,每個進(jìn)程其實都沒有機會不死而執(zhí)行新的任務(wù),只是傻傻的等待進(jìn) 程池滿,而被一刀斃命。也許是我老眼昏花,沒看出其中實現(xiàn)的端倪。。。

4、Reduce任務(wù)的分配與執(zhí)行

比之Map任務(wù),Reduce的分配及其簡單,基本上是所有Map任務(wù)完成了,有空閑的任務(wù)服務(wù)器,來了就給分配一個Job任務(wù)。因為Map任 務(wù)的結(jié)果星羅棋布,且變化多端,真要搞一個全局優(yōu)化的算法,絕對是得不償失。而Reduce任務(wù)的執(zhí)行進(jìn)程的構(gòu)造和分配流程,與Map基本完全的一致,沒 有啥可說的了。。。
但其實,Reduce任務(wù)與Map任務(wù)的最大不同,是Map任務(wù)的文件都在本地隔著,而Reduce任務(wù)需要到處采集。這個流程是作業(yè)服務(wù)器經(jīng) 由此Reduce任務(wù)所處的任務(wù)服務(wù)器,告訴Reduce任務(wù)正在執(zhí)行的進(jìn)程,它需要的Map任務(wù)執(zhí)行過的服務(wù)器地址,此Reduce任務(wù)服務(wù)器會于原 Map任務(wù)服務(wù)器聯(lián)系(當(dāng)然本地就免了...),通過FTP服務(wù),下載過來。這個隱含的直接數(shù)據(jù)聯(lián)系,就是執(zhí)行Reduce任務(wù)與執(zhí)行Map任務(wù)最大的不 同了。。。

5、作業(yè)的完成

當(dāng)所有Reduce任務(wù)都完成了,所需數(shù)據(jù)都寫到了分布式文件系統(tǒng)上,整個作業(yè)才正式完成了。此中,涉及到很多的類,很多的文件,很多的服務(wù)器,所以說起來很費勁,話說,一圖解千語,說了那么多,我還是畫兩幅圖,徹底表達(dá)一下吧。。。
首先,是一個時序圖。它模擬了一個由3個Map任務(wù)和1個Reduce任務(wù)構(gòu)成的作業(yè)執(zhí)行流程。我們可以看到,在執(zhí)行的過程中,只要有人太慢, 或者失敗,就會增加一次嘗試,以此換取最快的執(zhí)行總時間。一旦所有Map任務(wù)完成,Reduce開始運作(其實,不一定要這樣的...),對于每一個 Map任務(wù)來說,只有執(zhí)行到Reduce任務(wù)把它上面的數(shù)據(jù)下載完成,才算成功,否則,都是失敗,需要重新進(jìn)行嘗試。。。
而第二副圖,不是我畫的,就不轉(zhuǎn)載了,參見這里, 它描述了整個Map/Reduce的服務(wù)器狀況圖,包括整體流程、所處服務(wù)器進(jìn)程、輸入輸出等,看清楚這幅圖,對Map/Reduce的基本流程應(yīng)該能完 全跑通了。有這幾點,可能圖中描述的不夠清晰需要提及一下,一個是在HDFS中,其實還有日志文件,圖中沒有標(biāo)明;另一個是步驟5,其實是由 TaskTracker主動去拉取而不是JobTracker推送過來的;還有步驟8和步驟11,創(chuàng)建出來的MapTask和ReduceTask,在 Hadoop中都是運行在獨立的進(jìn)程上的。。。

IV. Map任務(wù)詳請

從上面,可以了解到整個Map和Reduce任務(wù)的整體流程,而后面要啰嗦的,是具體執(zhí)行中的細(xì)節(jié)。Map任務(wù)的輸入,是分布式文件系統(tǒng)上的, 包含鍵值對信息的文件。為了給每一個Map任務(wù)指定輸入,我們需要掌握文件格式把它分切成塊,并從每一塊中分離出鍵值信息。在HDFS中,輸入的文件格 式,是由InputFormat<K, V>類來表示的,在JobConf中,它的默認(rèn)值是TextInputFormat類(見getInputFormat),此類是特化的FileInputFormat<LongWritable, Text>子類,而FileInputFormat<K, V>正是InputFormat<K, V>的子類。通過這樣的關(guān)系我們可以很容易的理解,默認(rèn)的文件格式是文本文件,且鍵是LongWritable類型(整形數(shù)),值是Text類型(字符串)。僅僅知道文件類型是不夠的,我們還需要將文件中的每一條數(shù)據(jù),分離成鍵值對,這個工作,是RecordReader<K, V>來做的。在TextInputFormat的getRecordReader方法中我們可以看到,與TextInputFormat默認(rèn)配套使用的,是LineRecordReader類,是特化的RecordReader<LongWritable, Text>的子類,它將每一行作為一個記錄,起始的位置作為鍵,整行的字符串作為值。有了格式,分出了鍵值,還需要切開分給每一個Map任務(wù)。每一個Map任務(wù)的輸入用InputSplit接口表示,對于一個文件輸入而言,其實現(xiàn)是FileSplit,它包含著文件名、起始位置、長度和存儲它的一組服務(wù)器地址。。。
當(dāng)Map任務(wù)拿到所屬的InputSplit后,就開始一條條讀取記錄,并調(diào)用用于定義的Mapper,進(jìn)行計算(參見MapRunner<K1, V1, K2, V2>和MapTask的run方法),然后,輸出。MapTask會傳遞給Mapper一個OutputCollector<K, V>對象,作為輸出的數(shù)據(jù)結(jié)構(gòu)。它定義了一個collect的函數(shù),接受一個鍵值對。在MapTask中,定義了兩個OutputCollector的子類,一個是MapTask.DirectMapOutputCollector<K, V>,人如其名,它的實現(xiàn)確實很Direct,直截了當(dāng)。它會利用一個RecordWriter<K, V>對象,collect一調(diào)用,就直接調(diào)用RecordWriter<K, V>的write方法,寫入本地的文件中去。如果覺著RecordWriter<K, V>出現(xiàn)的很突兀,那么看看上一段提到的RecordReader<K, V>,基本上,數(shù)據(jù)結(jié)構(gòu)都是對應(yīng)著的,一個是輸入一個是輸出。輸出很對稱也是由RecordWriter<K, V>和OutputFormat<K, V>來協(xié)同完成的,其默認(rèn)實現(xiàn)是LineRecordWriter<K, V>和TextOutputFormat<K, V>,多么的眼熟啊。。。
除了這個非常直接的實現(xiàn)之外,MapTask中還有一個復(fù)雜的多的實現(xiàn),是MapTask.MapOutputBuffer<K extends Object, V extends Object>。有道是簡單壓倒一切,那為什么有很簡單的實現(xiàn),要琢磨一個復(fù)雜的呢。原因在于,看上去很美的往往帶著刺,簡單的輸出實現(xiàn),每調(diào)用一 次collect就寫一次文件,頻繁的硬盤操作很有可能導(dǎo)致此方案的低效。為了解決這個問題,這就有了這個復(fù)雜版本,它先開好一段內(nèi)存做緩存,然后制定一個比例做閾值開一個線程監(jiān)控此緩存。collect來的內(nèi)容,先寫到緩存中,當(dāng)監(jiān)控線程發(fā)現(xiàn)緩存的內(nèi)容比例超過閾值,掛起所有寫入操作,建一個新的文件,把緩存的內(nèi)容批量刷到此文件中去,清空緩存,重新開放,接受繼續(xù)collect。。。
為什么說是刷到文件中去呢。因為這不是一個簡單的照本宣科簡單復(fù)制的過程,在寫入之前,會先將緩存中的內(nèi)存,經(jīng)過排序、合并器 (Combiner)統(tǒng)計之后,才會寫入。如果你覺得Combiner這個名詞聽著太陌生,那么考慮一下Reducer,Combiner也就是一個 Reducer類,通過JobConf的setCombinerClass進(jìn)行設(shè)置,在常用的配置中,Combiner往往就是用用戶為Reduce任務(wù) 定義的那個Reducer子類。只不過,Combiner只是服務(wù)的范圍更小一些而已,它在Map任務(wù)執(zhí)行的服務(wù)器本地,依照Map處理過的那一小部分?jǐn)?shù) 據(jù),先做一次Reduce操作,這樣,可以壓縮需要傳輸內(nèi)容的大小,提高速度。每一次刷緩存,都會開一個新的文件,等此任務(wù)所有的輸入都處理完成后,就有 了若干個有序的、經(jīng)過合并的輸出文件。系統(tǒng)會將這些文件搞在一起,再做一個多路的歸并外排,同時使用合并器進(jìn)行合并,最終,得到了唯一的、有序的、經(jīng)過合 并的中間文件(注:文件數(shù)量等同于分類數(shù)量,在不考慮分類的時候,簡單的視為一個...)。它,就是Reduce任務(wù)夢寐以求的輸入文件。。。
除了做合并,復(fù)雜版本的OutputCollector,還具有分類的功能。分類,是通過Partitioner<K2, V2>來定義的,默認(rèn)實現(xiàn)是HashPartitioner<K2, V2>,作業(yè)提交者可以通過JobConf的setPartitionerClass來自定義。分類的含義是什么呢,簡單的說,就是將Map任務(wù)的輸出,劃分到若干個文件中(通常與Reduce任務(wù)數(shù)目相等),使得每一個Reduce任務(wù),可以處理某一類文件。這樣的好處是大大的,舉一個例子說明一下。比如有一個作業(yè)是進(jìn)行單詞統(tǒng)計的,其Map任務(wù)的中間結(jié)果應(yīng)該是以單詞為鍵,以單詞數(shù)量為值的文件。如果這時候只有一個Reduce任務(wù),那還好說,從全部的Map任務(wù)那里收集文件過來,分別統(tǒng)計得到最后的輸出文件就好。但是,如果單Reduce任務(wù)無法承載此負(fù)載或效率太低,就需要多個Reduce任務(wù)并行執(zhí)行。此時,再沿用之前的模式就有了問題。每個Reduce任務(wù)從一部分Map任務(wù)那 里獲得輸入文件,但最終的輸出結(jié)果并不正確,因為同一個單詞可能在不同的Reduce任務(wù)那里都有統(tǒng)計,需要想方法把它們統(tǒng)計在一起才能獲得最后結(jié)果,這 樣就沒有將Map/Reduce的作用完全發(fā)揮出來。這時候,就需要用到分類。如果此時有兩個Reduce任務(wù),那么將輸出分成兩類,一類存放字母表排序 較高的單詞,一類存放字母表排序低的單詞,每一個Reduce任務(wù)從所有的Map任務(wù)那里獲取一類的中間文件,得到自己的輸出結(jié)果。最終的結(jié)果,只需要把各個Reduce任務(wù)輸出的,拼接在一起就可以了。本質(zhì)上,這就是將Reduce任務(wù)的輸入,由垂直分割,變成了水平分割。Partitioner的作用,正是接受一個鍵值,返回一個分類的序號。它會在從緩存刷到文件之前做這個工作,其實只是多了一個文件名的選擇而已,別的邏輯都不需要變化。。。
除了緩存、合并、分類等附加工作之外,復(fù)雜版本的OutputCollector還支持錯誤數(shù)據(jù)的跳過功能,在后面分布式將排錯的時候,還會提及,標(biāo)記一下,按下不表。。。

V. Reduce任務(wù)詳情

理論上看,Reduce任務(wù)的整個執(zhí)行流程要比Map任務(wù)更為的羅嗦一些,因為,它需要收集輸入文件,然后才能進(jìn)行處理。Reduce任務(wù),主要有這么三個步驟:Copy、Sort、Reduce(參見ReduceTask的run方法)。所謂Copy,就是從執(zhí)行各個Map任務(wù)的服務(wù)器那里,收羅到本地來??截惖娜蝿?wù),是由ReduceTask.ReduceCopier類來負(fù)責(zé),它有一個內(nèi)嵌類,叫MapOutputCopier, 它會在一個單獨的線程內(nèi),負(fù)責(zé)某個Map任務(wù)服務(wù)器上文件的拷貝工作。遠(yuǎn)程拷貝過來的內(nèi)容(當(dāng)然也可以是本地了...),作為MapOutput對象存 在,它可以在內(nèi)存中也可以序列化在磁盤上,這個根據(jù)內(nèi)存使用狀況來自動調(diào)節(jié)。整個拷貝過程是一個動態(tài)的過程,也就是說它不是一次給好所有輸入信息就不再變 化了。它會不停的調(diào)用TaskUmbilicalProtocol協(xié)議的getMapCompletionEvents方 法,向其父TaskTracker詢問此作業(yè)個Map任務(wù)的完成狀況(TaskTracker要向JobTracker詢問后再轉(zhuǎn)告給它...)。當(dāng)獲取 到相關(guān)Map任務(wù)執(zhí)行服務(wù)器的信息后,都會有一個線程開啟,做具體的拷貝工作。同時,還有一個內(nèi)存Merger線程和一個文件Merger線程在同步工 作,它們將新鮮下載過來的文件(可能在內(nèi)存中,簡單的統(tǒng)稱為文件...),做著歸并排序,以此,節(jié)約時間,降低輸入文件的數(shù)量,為后續(xù)的排序工作減 負(fù)。。。
Sort,排序工作,就相當(dāng)于上述排序工作的一個延續(xù)。它會在所有的文件都拷貝完畢后進(jìn)行,因為雖然同步有做著歸并的工作,但可能留著尾巴,沒 做徹底。經(jīng)過這一個流程,該徹底的都徹底了,一個嶄新的、合并了所有所需Map任務(wù)輸出文件的新文件,誕生了。而那些千行萬苦從其他各個服務(wù)器網(wǎng)羅過來的 Map任務(wù)輸出文件,很快的結(jié)束了它們的歷史使命,被掃地出門一掃而光,全部刪除了。。。
所謂好戲在后頭,Reduce任務(wù)的最后一個階段,正是Reduce本身。它也會準(zhǔn)備一個OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一個RecordWriter,collect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統(tǒng),基本都是分布式文件系統(tǒng), 或者說是HDFS。而在輸入方面,ReduceTask會從JobConf那里調(diào)用一堆getMapOutputKeyClass、 getMapOutputValueClass、getOutputKeyComparator等等之類的自定義類,構(gòu)造出Reducer所需的鍵類型, 和值的迭代類型Iterator(一個鍵到了這里一般是對應(yīng)一組值)。具體實現(xiàn)頗為拐彎抹角,建議看一下Merger.MergeQueue,RawKeyValueIteratorReduceTask.ReduceValuesIterator等等之類的實現(xiàn)。有了輸入,有了輸出,不斷循環(huán)調(diào)用自定義的Reducer,最終,Reduce階段完成。。。

VI. 分布式支持

1、服務(wù)器正確性保證

Hadoop Map/Reduce服務(wù)器狀況和HDFS很類似,由此可知,救死扶傷的方法也是大同小異。廢話不多說了,直接切正題。同作為客戶端,Map /Reduce的客戶端只是將作業(yè)提交,就開始搬個板凳看戲,沒有占茅坑的行動。因此,一旦它掛了,也就掛了,不傷大雅。而任務(wù)服務(wù)器,也需要隨時與作業(yè) 服務(wù)器保持心跳聯(lián)系,一旦有了問題,作業(yè)服務(wù)器可以將其上運行的任務(wù),移交給它人完成。作業(yè)服務(wù)器,作為一個單點,非常類似的是利用還原點(等同于 HDFS的鏡像)和歷史記錄(等同于HDFS的日志),來進(jìn)行恢復(fù)。其上,需要持久化用于恢復(fù)的內(nèi)容,包含作業(yè)狀況、任務(wù)狀況、各個任務(wù)嘗試的工作狀況 等。有了這些內(nèi)容,再加上任務(wù)服務(wù)器的動態(tài)注冊,就算挪了個窩,還是很容易恢復(fù)的。JobHistory是歷史記錄相 關(guān)的一個靜態(tài)類,本來,它也就是一個干寫日志活的,只是在Hadoop的實現(xiàn)中,對日志的寫入做了面向?qū)ο蟮姆庋b,同時又大量用到觀察者模式做了些嵌入, 使得看起來不是那么直觀。本質(zhì)上,它就是打開若干個日志文件,利用各類接口來往里面寫內(nèi)容。只不過,這些日志,會放在分布式文件系統(tǒng)中,就不需要像 HDFS那樣,來一個SecondXXX隨時候命,由此可見,有巨人在腳下踩著,真好。JobTracker.RecoveryManager類是作業(yè)服 務(wù)器中用于進(jìn)行恢復(fù)相關(guān)的事情,當(dāng)作業(yè)服務(wù)器啟動的時候,會調(diào)用其recover方法,恢復(fù)日志文件中的內(nèi)容。其中步驟,注釋中寫的很清楚,請自行查 看。。。

2、任務(wù)執(zhí)行的正確和速度

整個作業(yè)流程的執(zhí)行,秉承著木桶原理。執(zhí)行的最慢的Map任務(wù)和Reduce任務(wù),決定了系統(tǒng)整體執(zhí)行時間(當(dāng)然,如果執(zhí)行時間在整個流程中占 比例很小的話,也許就微不足道了...)。因此,盡量加快最慢的任務(wù)執(zhí)行速度,成為提高整體速度關(guān)鍵。所使用的策略,簡約而不簡單,就是一個任務(wù)多次執(zhí)行。 當(dāng)所有未執(zhí)行的任務(wù)都分配出去了,并且先富起來的那部分任務(wù)已經(jīng)完成了,并還有任務(wù)服務(wù)器孜孜不倦的索取任務(wù)的時候,作業(yè)服務(wù)器會開始炒剩飯,把那些正在 吭哧吭哧在某個服務(wù)器上慢慢執(zhí)行的任務(wù),再把此任務(wù)分配到一個新的任務(wù)服務(wù)器上,同時執(zhí)行。兩個服務(wù)器各盡其力,成王敗寇,先結(jié)束者的結(jié)果將被采納。這樣 的策略,隱含著一個假設(shè),就是我們相信,輸入文件的分割算法是公平的,某個任務(wù)執(zhí)行慢,并不是由于這個任務(wù)本身負(fù)擔(dān)太重,而是由于服務(wù)器不爭氣負(fù)擔(dān)太重能 力有限或者是即將撒手西去,給它換個新環(huán)境,人挪死樹挪活事半功倍。。。
當(dāng)然,肯定有哽咽的任務(wù),不論是在哪個服務(wù)器上,都無法順利完成。這就說明,此問題不在于服務(wù)器上,而是任務(wù)本身天資有缺憾。缺憾在何處?每個作業(yè),功能 代碼都是一樣的,別的任務(wù)成功了,就是這個任務(wù)不成功,很顯然,問題出在輸入那里。輸入中有非法的輸入條目,導(dǎo)致程序無法辨識,只能揮淚惜別。說到這里, 解決策略也浮出水面了,三十六計走位上,惹不起,還是躲得起的。在MapTask中的 MapTask.SkippingRecordReader<K, V>和ReduceTask里的 ReduceTask.SkippingReduceValuesIterator<KEY,VALUE>,都是用于干這個事情的。它們的原 理很簡單,就是在讀一條記錄前,把當(dāng)前的位置信息,封裝成SortedRanges.Range對象,經(jīng)由Task的 reportNextRecordRange方法提交到TaskTracker上去。TaskTracker會把這些內(nèi)容,擱在TaskStatus對象 中,隨著心跳消息,匯報到JobTracker上面。這樣,作業(yè)服務(wù)器就可以隨時隨刻了解清楚,每個任務(wù)正讀取在那個位置,一旦出錯,再次執(zhí)行的時候,就 在分配的任務(wù)信息里面添加一組SortedRanges信息。MapTask或ReduceTask讀取的時候,會看一下這些區(qū)域,如果當(dāng)前區(qū)域正好處于 上述雷區(qū),跳過不讀。如此反復(fù),正可謂,道路曲折,前途光明啊。。。

VII. 總結(jié)

對于Map/Reduce而言,真正的困難,在于提高其適應(yīng)能力,打造一款能夠包治百病的執(zhí)行框架。Hadoop已經(jīng)做得很好了,但只有真正搞清楚了整個流程,你才能幫助它做的更好。。。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    国产毛片av一区二区三区小说| 国产又粗又猛又爽色噜噜| 日韩欧美中文字幕av| 日本不卡视频在线观看| 亚洲日本中文字幕视频在线观看 | 欧美人妻盗摄日韩偷拍| 在线观看国产成人av天堂野外| 最新日韩精品一推荐日韩精品| 91熟女大屁股偷偷对白| 国产中文字幕一区二区| 韩国日本欧美国产三级| 国产精品香蕉在线的人| 91免费精品国自产拍偷拍| 精品少妇一区二区三区四区| 亚洲av成人一区二区三区在线| 激情偷拍一区二区三区视频| 欧美韩日在线观看一区| 日本高清不卡一二三区| 在线免费观看一二区视频| 日韩成人h视频在线观看| 免费一级欧美大片免费看| 欧美又黑又粗大又硬又爽| 国产在线一区二区三区不卡| 欧美精品亚洲精品日韩精品| 激情丁香激情五月婷婷| 中字幕一区二区三区久久蜜桃| 中日韩美一级特黄大片| 亚洲av专区在线观看| 好吊日在线观看免费视频| 色婷婷在线视频免费播放| 国产二级一级内射视频播放| 91免费精品国自产拍偷拍| 天堂av一区一区一区| 91爽人人爽人人插人人爽| 日本一区二区三区黄色| 熟女体下毛荫荫黑森林自拍| 亚洲中文字幕高清乱码毛片| 久草热视频这里只有精品| 国产目拍亚洲精品区一区| 一二区不卡不卡在线观看| 国产亚洲神马午夜福利|