作者:李智慧 來源:華章科技 大數(shù)據(jù)技術(shù)其實(shí)是分布式技術(shù)在數(shù)據(jù)處理領(lǐng)域的創(chuàng)新性應(yīng)用,其本質(zhì)和此前講到的分布式技術(shù)思路一脈相承,即用更多的計算機(jī)組成一個集群,提供更多的計算資源,從而滿足更大的計算壓力要求。 大數(shù)據(jù)技術(shù)討論的是,如何利用更多的計算機(jī)滿足大規(guī)模的數(shù)據(jù)計算要求。 大數(shù)據(jù)就是將各種數(shù)據(jù)統(tǒng)一收集起來進(jìn)行計算,發(fā)掘其中的價值。這些數(shù)據(jù),既包括數(shù)據(jù)庫的數(shù)據(jù),也包括日志數(shù)據(jù),還包括專門采集的用戶行為數(shù)據(jù);既包括企業(yè)內(nèi)部自己產(chǎn)生的數(shù)據(jù),也包括從第三方采購的數(shù)據(jù),還包括使用網(wǎng)絡(luò)爬蟲獲取的各種互聯(lián)網(wǎng)公開數(shù)據(jù)。 面對如此龐大的數(shù)據(jù),如何存儲、如何利用大規(guī)模的服務(wù)器集群處理計算才是大數(shù)據(jù)技術(shù)的核心。 01 HDFS分布式文件存儲架構(gòu)大規(guī)模的數(shù)據(jù)計算首先要解決的是大規(guī)模數(shù)據(jù)的存儲問題。如何將數(shù)百TB或數(shù)百PB的數(shù)據(jù)存儲起來,通過一個文件系統(tǒng)統(tǒng)一管理,這本身就是一項(xiàng)極大的挑戰(zhàn)。 HDFS的架構(gòu),如圖31-1所示。 ▲圖31-1 HDFS架構(gòu) HDFS可以將數(shù)千臺服務(wù)器組成一個統(tǒng)一的文件存儲系統(tǒng),其中NameNode服務(wù)器充當(dāng)文件控制塊的角色,進(jìn)行文件元數(shù)據(jù)管理,即記錄文件名、訪問權(quán)限、數(shù)據(jù)存儲地址等信息,而真正的文件數(shù)據(jù)則存儲在DataNode服務(wù)器上。 DataNode以塊為單位存儲數(shù)據(jù),所有的塊信息,比如塊ID、塊所在的服務(wù)器IP地址等,都記錄在NameNode服務(wù)器上,而具體的塊數(shù)據(jù)則存儲在DataNode服務(wù)器上。理論上,NameNode可以將所有DataNode服務(wù)器上的所有數(shù)據(jù)塊都分配給一個文件,也就是說,一個文件可以使用所有服務(wù)器的硬盤存儲空間。 此外,HDFS為了保證不會因?yàn)橛脖P或者服務(wù)器損壞而導(dǎo)致文件損壞,還會對數(shù)據(jù)塊進(jìn)行復(fù)制,每個數(shù)據(jù)塊都會存儲在多臺服務(wù)器上,甚至多個機(jī)架上。 02 MapReduce大數(shù)據(jù)計算架構(gòu)數(shù)據(jù)存儲在HDFS上的最終目標(biāo)還是為了計算,通過數(shù)據(jù)分析或者機(jī)器學(xué)習(xí)獲得有益的結(jié)果。但是如果像傳統(tǒng)的應(yīng)用程序那樣把HDFS當(dāng)作普通文件,從文件中讀取數(shù)據(jù)后進(jìn)行計算,那么對于需要一次計算數(shù)百TB數(shù)據(jù)的大數(shù)據(jù)計算場景,就不知道要算到什么時候了。 大數(shù)據(jù)處理的經(jīng)典計算框架是MapReduce。MapReduce的核心思想是對數(shù)據(jù)進(jìn)行分片計算。既然數(shù)據(jù)是以塊為單位分布存儲在很多臺服務(wù)器組成的集群上的,那么能不能就在這些服務(wù)器上針對每個數(shù)據(jù)塊進(jìn)行分布式計算呢? 事實(shí)上,MapReduce可以在分布式集群的多臺服務(wù)器上啟動同一個計算程序,每個服務(wù)器上的程序進(jìn)程都可以讀取本服務(wù)器上要處理的數(shù)據(jù)塊進(jìn)行計算,因此,大量的數(shù)據(jù)就可以同時進(jìn)行計算了。但是這樣一來,每個數(shù)據(jù)塊的數(shù)據(jù)都是獨(dú)立的,如果這些數(shù)據(jù)塊需要進(jìn)行關(guān)聯(lián)計算怎么辦? MapReduce將計算過程分成兩個部分:
下面以經(jīng)典的WordCount,即統(tǒng)計所有數(shù)據(jù)中相同單詞的詞頻數(shù)據(jù)為例,來認(rèn)識map和reduce的處理過程,如圖31-2所示。 ▲圖31-2 詞頻統(tǒng)計程序WordCount的MapReduce處理過程 假設(shè)原始數(shù)據(jù)有兩個數(shù)據(jù)塊,MapReduce框架啟動了兩個map進(jìn)程進(jìn)行處理,它們分別讀入數(shù)據(jù)。 map函數(shù)會對輸入數(shù)據(jù)進(jìn)行分詞處理,然后針對每個單詞輸出<單詞, 1>這樣的<key, value>結(jié)果。然后MapReduce框架進(jìn)行shuffle操作,相同的key發(fā)送給同一個reduce進(jìn)程,reduce的輸入就是<key, value列表>這樣的結(jié)構(gòu),即相同key的value合并成了一個value列表。 在這個示例中,這個value列表就是由很多個1組成的列表。reduce對這些1進(jìn)行求和操作,就得到每個單詞的詞頻結(jié)果了。 具體的MapReduce程序如下: public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }} 上面講述了map和reduce進(jìn)程合作完成數(shù)據(jù)處理的過程,那么這些進(jìn)程是如何在分布式的服務(wù)器集群上啟動的呢?數(shù)據(jù)是如何流動并最終完成計算的呢?下面以MapReduce1為例來看這個過程,如圖31-3所示。 ▲圖31-3 MapReduce1計算處理過程 MapReduce1主要有JobTracker和TaskTracker這兩種進(jìn)程角色,JobTracker在MapReduce集群中只有一個,而TaskTracker則和DataNode一起啟動在集群的所有服務(wù)器上。 MapReduce應(yīng)用程序JobClient啟動后,會向JobTracker提交作業(yè),JobTracker根據(jù)作業(yè)中輸入的文件路徑分析需要在哪些服務(wù)器上啟動map進(jìn)程,然后就向這些服務(wù)器上的TaskTracker發(fā)送任務(wù)命令。 TaskTracker收到任務(wù)后,啟動一個TaskRunner進(jìn)程下載任務(wù)對應(yīng)的程序,然后反射加載程序中的map函數(shù),讀取任務(wù)中分配的數(shù)據(jù)塊,并進(jìn)行map計算。map計算結(jié)束后,TaskTracker會對map輸出進(jìn)行shuffle操作,然后TaskRunner加載reduce函數(shù)進(jìn)行后續(xù)計算。 HDFS和MapReduce都是Hadoop的組成部分。 03 Hive大數(shù)據(jù)倉庫架構(gòu)MapReduce雖然只有map和reduce這兩個函數(shù),但幾乎可以滿足任何大數(shù)據(jù)分析和機(jī)器學(xué)習(xí)的計算場景。不過,復(fù)雜的計算可能需要使用多個job才能完成,這些job之間還需要根據(jù)其先后依賴關(guān)系進(jìn)行作業(yè)編排,開發(fā)比較復(fù)雜。 傳統(tǒng)上,主要使用SQL進(jìn)行數(shù)據(jù)分析,如果能根據(jù)SQL自動生成MapReduce,就可以極大降低大數(shù)據(jù)技術(shù)在數(shù)據(jù)分析領(lǐng)域的應(yīng)用門檻。 Hive就是這樣一個工具。我們來看對于如下一條常見的SQL語句,Hive是如何將其轉(zhuǎn)換成MapReduce計算的。
這是一條常見的SQL統(tǒng)計分析語句,用于統(tǒng)計不同年齡的用戶訪問不同網(wǎng)頁的興趣偏好,具體數(shù)據(jù)輸入和執(zhí)行結(jié)果示例如圖31-4所示。 ▲圖31-4 SQL統(tǒng)計分析輸入數(shù)據(jù)和執(zhí)行結(jié)果舉例 看這個示例我們就會發(fā)現(xiàn),這個計算場景和WordCount很像。事實(shí)上也確實(shí)如此,我們可以用MapReduce完成這條SQL的處理,如圖31-5所示。 ▲圖31-5 MapReduce完成SQL處理過程舉例 map函數(shù)輸出的key是表的行記錄,value是1,reduce函數(shù)對相同的行進(jìn)行記錄,也就是針對具有相同key的value集合進(jìn)行求和計算,最終得到SQL的輸出結(jié)果。 Hive要做的就是將SQL翻譯成MapReduce程序代碼。實(shí)際上,Hive內(nèi)置了很多Operator,每個Operator完成一個特定的計算過程,Hive將這些Operator構(gòu)造成一個有向無環(huán)圖DAG,然后根據(jù)這些Operator之間是否存在shuffle將其封裝到map或者reduce函數(shù)中,之后就可以提交給MapReduce執(zhí)行了。 Operator組成的DAG如圖31-6所示,這是一個包含where查詢條件的SQL,where查詢條件對應(yīng)一個FilterOperator。 ▲圖31-6 示例SQL的MapReduce 有向無環(huán)圖DAG Hive整體架構(gòu)如圖31-7所示。Hive的表數(shù)據(jù)存儲在HDFS。表的結(jié)構(gòu),比如表名、字段名、字段之間的分隔符等存儲在Metastore中。用戶通過Client提交SQL到Driver,Driver請求Compiler將SQL編譯成如上示例的DAG執(zhí)行計劃中,然后交給Hadoop執(zhí)行。 ▲圖31-7 Hive整體架構(gòu) 04 Spark快速大數(shù)據(jù)計算架構(gòu)MapReduce主要使用硬盤存儲計算過程中的數(shù)據(jù),雖然可靠性比較高,但是性能卻較差。 此外,MapReduce只能使用map和reduce函數(shù)進(jìn)行編程,雖然能夠完成各種大數(shù)據(jù)計算,但是編程比較復(fù)雜。而且受map和reduce編程模型相對簡單的影響,復(fù)雜的計算必須組合多個MapReduce job才能完成,編程難度進(jìn)一步增加。 Spark在MapReduce的基礎(chǔ)上進(jìn)行了改進(jìn),它主要使用內(nèi)存進(jìn)行中間計算數(shù)據(jù)存儲,加快了計算執(zhí)行時間,在某些情況下性能可以提升上百倍。 Spark的主要編程模型是RDD,即彈性數(shù)據(jù)集。在RDD上定義了許多常見的大數(shù)據(jù)計算函數(shù),利用這些函數(shù)可以用極少的代碼完成較為復(fù)雜的大數(shù)據(jù)計算。前面舉例的WorkCount如果用Spark編程,只需要三行代碼: val textFile = sc.textFile('hdfs://...')val counts = textFile.flatMap(line => line.split(' ')) .map(word => (word, 1)) .reduceByKey(_ + _)counts.saveAsTextFile('hdfs://...') 首先,從HDFS讀取數(shù)據(jù),構(gòu)建出一個RDD textFile。然后,在這個RDD上執(zhí)行三個操作:
上面代碼中flatMap、map、reduceByKey都是Spark的RDD轉(zhuǎn)換函數(shù),RDD轉(zhuǎn)換函數(shù)的計算結(jié)果還是RDD,所以上面三個函數(shù)可以寫在一行代碼上,最后得到的還是RDD。 Spark會根據(jù)程序中的轉(zhuǎn)換函數(shù)生成計算任務(wù)執(zhí)行計劃,這個執(zhí)行計劃就是一個DAG。Spark可以在一個作業(yè)中完成非常復(fù)雜的大數(shù)據(jù)計算,Spark DAG示例如圖31-8所示。 ▲圖31-8 Spark RDD有向無環(huán)圖DAG示例 在圖31-8中,A、C和E是從HDFS上加載的RDD。A經(jīng)過groupBy分組統(tǒng)計轉(zhuǎn)換函數(shù)計算后得到RDD B,C經(jīng)過map轉(zhuǎn)換函數(shù)計算后得到RDD D,D和E經(jīng)過union合并轉(zhuǎn)換函數(shù)計算后得到RDD F,B和F經(jīng)過join連接轉(zhuǎn)換函數(shù)計算后得到最終結(jié)果RDD G。 05 大數(shù)據(jù)流計算架構(gòu)Spark雖然比MapReduce快很多,但是在大多數(shù)場景下計算耗時依然是分鐘級別的,這種計算一般被稱為大數(shù)據(jù)批處理計算。而在實(shí)際應(yīng)用中,有些時候需要在毫秒級完成不斷輸入的海量數(shù)據(jù)的計算處理,比如實(shí)時對攝像頭采集的數(shù)據(jù)進(jìn)行監(jiān)控分析,這就是所謂的大數(shù)據(jù)流計算。 早期比較著名的流式大數(shù)據(jù)計算引擎是Storm,后來隨著Spark的火爆,Spark上的流式計算引擎Spark Streaming也逐漸流行起來。Spark Streaming的架構(gòu)原理是將實(shí)時流入的數(shù)據(jù)切分成小的一批一批的數(shù)據(jù),然后將這些小的一批批數(shù)據(jù)交給Spark執(zhí)行。 由于數(shù)據(jù)量比較小,Spark Streaming又常駐系統(tǒng),不需要重新啟動,因此可以在毫秒級完成計算,看起來像是實(shí)時計算一樣,如圖31-9所示。 ▲圖31-9 Spark Streaming流計算將實(shí)時流式數(shù)據(jù)轉(zhuǎn)化成小的批處理計算 最近幾年比較流行的大數(shù)據(jù)引擎Flink其架構(gòu)原理和Spark Streaming很相似,它可以基于不同的數(shù)據(jù)源,根據(jù)數(shù)據(jù)量和計算場景的要求,靈活地適應(yīng)流計算和批處理計算。 06 小結(jié)大數(shù)據(jù)技術(shù)可以說是分布式技術(shù)的一個分支,兩者都是面臨大量的計算壓力時,采用分布式服務(wù)器集群的方案解決問題。差別是大數(shù)據(jù)技術(shù)要處理的數(shù)據(jù)具有關(guān)聯(lián)性,所以需要有個中心服務(wù)器進(jìn)行管理,NameNode、JobTracker都是這樣的中心服務(wù)器。 而高并發(fā)的互聯(lián)網(wǎng)分布式系統(tǒng)為了提高系統(tǒng)可用性,降低中心服務(wù)器可能會出現(xiàn)的瓶頸壓力、提升性能,通常不會在架構(gòu)中使用這樣的中心服務(wù)器。
|
|