Hadoop原理及應用

奇思妙想002 發佈 2024-04-27T02:54:17.464924+00:00

Hadoop是一個大數據解決方案,提供了一套分布式系統基礎架構。HDFS(Hadoop Distributed File System):分布式文件系統,HDFS分為NameNode和DataNode,NameNode負責保存元數據的基本信息,DataNode負責具體數據的存儲。

hadoop是一個大數據解決方案,提供了一套分布式系統基礎架構。

hdfs(Hadoop Distributed File System):分布式文件系統,HDFS分為NameNode和DataNode,NameNode負責保存元數據的基本信息,DataNode負責具體數據的存儲。

MapReduce:計算引擎。分為JobTracker和TaskTracker,JobTracker負責任務的分發,TaskTracker負責具體任務的執行。

YARN(Yet Another Resource Negotiator):統一資源管理調度

Hadoop集群是Master/Slave(M/S)架構,NameNode和JobTracker運行在Master節點上,DataNode和TaskTracker運行在Slave節點上。

1.HDFS

HDFS是Hadoop的存儲系統,包含客戶端(Client)、元數據節點(NameNode)、備份節點(Secondary NameNode)和數據存儲節點(DataNode)。

1.1.Client:

ClientClient是HDFS客戶端,它提供了一個文件系統接口,用戶通過Client對NameNode和DataNode進行交互訪問來操作HDFS中的文件。

1.2.NameNode:

NameNode是HDFS的管理節點,負責HDFS的目錄樹和相關的文件元數據的存儲。這些元數據以HDFS元數據鏡像文件(fsimage)和HDFS文件改動日誌(editlog)兩種形式存放在本地磁碟上。HDFS元數據鏡像文件和HDFS文件改動日誌在HDFS啟動時構造。此外,NameNode負責監控集群中DataNode的健康狀態,一旦發現某個DataNode宕掉,則將該DataNode從HDFS集群移除並在其他DataNode上重新備份該DataNode的數據(該過程被稱為數據重平衡,即rebalance),以保障數據副本的完整性和集群的高可用性。

1.3.Secondary NameNode:

Secondary NameNode是NameNode元數據的備份,在NameNode宕機後,Secondary NameNode會接替NameNode的工作,負責整個集群的管理。同時,為了減小NameNode的壓力,NameNode並不會自己合併HDFS元數據鏡像文件和HDFS文件改動日誌,而是將該任務交由Secondary NameNode完成,Secondary NameNode在合併完成後將結果發送到NameNode,NameNode再將合併後的結果存儲到本地磁碟。

1.4.DataNode:

DataNode負責具體的數據存儲,並將數據的元信息定期匯報給NameNode。DataNode以固定大小的塊(Block)為基本單位組織和存儲文件內容(Block的大小通過dfs.blocksize設置,默認為64MB)。

1.5.HDFS文件的寫入流程

在應用程式調用客戶端API上傳一個文件到HDFS時,該請求首先會到達NameNode,NameNode檢查文件的狀態和權限。如果通過檢查,則寫本地文件改動日誌並返回可寫的DataNode列表;客戶端在收到列表後將文件切分為固定大小的多個塊發送到最近的DataNode上;等待每個DataNode上的數據塊都寫完成並返回確認信息後,文件寫操作完成。

1.6.HDFS文件的讀取流程

在應用程式讀取文件時,首先會將請求發送給NameNode,NameNode返回文件塊所在的DataNode列表(DataNode列表按照客戶端距離DataNode網絡拓撲的遠近排序),應用程式從每個DataNode上都獲取數據並將數據讀取到本地。

1.7.HDFS數據副本

為了保證數據可靠性,HDFS會將同一個數據塊對應的數據副本(數據副本個數默認為3)存儲在多個不同的DataNode上。在某個DataNode宕機後,HDFS會將該節點上的數據存儲到其他可用的DataNode上,以保障數據的安全。

2.MapReduce

Hadoop MapReduce是一個分布式計算框架,其最大的特點是基於分布式文件系統的海量數據(至少TB級數據)的分析特徵,在不同的節點上快速地完成計算並將計算結果合併返回。

Hadoop MapReduce採用了Master/Slave架構。Master/Slave架構由一個Master節點的JobTracker和多個Slave節點的TaskTracker共同組成。

MapReduce的作業會輸入的數據集按照Partition分為若干獨立的數據集,每個數據數據集按照Map方式完全並行地進行處理。在Map階段執行完成後將Map的計算結果進行排序,然後把Map階段的計算結果輸入Reduce任務,Reduce任務執行完成後將最終的結果返回給客戶端。

2.1.Job Client

MapReduce程序通過Job Client提交到JobTracker並運行;通過Client提供的接口查看程序運行狀態。

2.2.JobTracker

JobTracker主要負責作業的調度和資源的監控。JobTracker在將作業調度起來後會實時監控所有作業中任務的運行狀態,一旦發現某個任務執行失敗,則會將該任務轉移到其他健康的節點上運行,以保障任務的正常運行;同時,JobTracker會實時跟蹤任務的執行狀態、執行進度、系統資源使用情況等信息,並將這些信息同步到任務調度器,調度器會根據每個節點上系統資源的使用情況為每個節點分配不同的任務。

2.3.TaskTracker

TaskTracker接收JobTracker發送過來的命令並執行相應的操作(如啟動新任務、殺死任務等),同時會周期性地通過Heartbeat將本節點上任務的運行情況和資源的使用情況匯報給JobTracker。

2.4.Task

Task是具體執行計算的任務,分為Map Task和Reduce Task兩種,均由TaskTracker啟動。MapReduce處理的基本單位是Data Split,Data Split包含了任務元數據信息(例如,數據起始位置、數據長度、數據所在節點等),Data Split的劃分方法完全由客戶端決定,Data Split的多少決定了Map Task的數量,因為每個Data Split都被交由一個Map Task處理。

MapReduce Map Task的執行過程如圖所示。在任務讀取到數據後會將數據載入HDFS,Map Task首先將HDFS按照分區(Partition)分為多個Data Split,每一個Data Split都被疊代解析成一個個Key-Value鍵值對,然後依次調用用戶自定義的Map()函數對數據進行處理,並將臨時處理的結果存放到HDFS供Reduce階段使用,HDFS的臨時數據也會被分成若干分區(Partition),每個分區的數據都將被一個Reduce Task任務執行和處理

2.5.Reduce Task的執行過程

Reduce Task的執行過程分為3個階段。

(1)Shuffle階段:從遠程節點上讀取Map Task的中間結果。

(2)Map階段:按照Key調用Map()函數處理Key-Value鍵值對的數據,將處理結果寫入HDFS供Reduce階段使用。

(3)Reduce階段:依次讀取<Key,Value List>,調用Reduce()函數處理數據,並將最終結果存入HDFS。

2.6.Hadoop MapReduce的作業生命周期

作業提交與初始化、任務調度與監控、任務運行環境準備、任務執行和作業完成5個階段。

(1)作業提交與初始化:用戶在執行Submit命令提交作業後,會通過Job Client將作業的相關信息(例如,程序JAR包、依賴JAR包、配置文件、分片元數據信息文件等)上傳到HDFS。其中,分片元數據信息文件記錄了每個輸入分片的邏輯位置信息。然後Job Client通過RPC接口通知JobTracker有新作業提交。JobTracker在收到新作業提交的請求後,通過作業調度模塊對作業進行初始化。在初始化完成後,調度模塊會為作業創建一個JobInProgress對象以跟蹤作業的運行狀況,同時,JobInProgress會為每個Task都創建一個TaskInProgress對象以跟蹤每個任務的運行狀況,一個TaskInProgress可能需要管理多個Task Attempt。

(2)任務調度與監控:通過JobTracker來完成。TaskTracker定時通過心跳向JobTracker匯報當前節點的伺服器資源使用情況。當伺服器有空閒資源時,JobTracker會按照一定的策略選擇一個合適的任務在該伺服器上執行,該過程為任務的調度過程。任務調度過程的核心是優先考慮數據本地性,即Hadoop會儘量將任務在其所對應的數據節點上執行。除了作業的調度,JobTracker還負責跟蹤作業的整個運行過程,當發現TaskTracker或Task運行失敗時,JobTracker會將計算轉移到其他健康的節點上;當發現某個Task的執行進度遠落後於同一作業的其他Task時,JobTracker會重新啟動一個相同的Task來執行,並以率先執行完成的Task計算結果作為最終結果。

(3)任務運行環境準備:在運行環境準備期間,TaskTracker會為每個Task都申請一定的獨立資源並啟動一個JVM來運行,以避免不同的Task在運行過程中相互影響;同時,TaskTracker使用作業系統進程來實現資源隔離,以防止Task之間相互爭搶系統資源(主要指內存、CPU等資源)。

(4)任務執行:TaskTracker為Task準備好運行環境後會啟動Task並開始運行Task。在Task運行過程中,每個Task運行的最新進度都首先由Task通過RPC接口匯報給TaskTracker,再由TaskTracker匯報給JobTracker。

(5)作業完成:在等待所有Task執行完成後,整個作業才執行成功,並將執行結果輸出到HDFS。

YARN

YARN是一個分布式資源管理和任務調度框架,其核心由ResourceManager、NodeManager、ApplicationMaster 3個模塊組成。其中,ResourceManager負責集群資源的管理、監控和分配,NodeManager負責節點的維護,ApplicationMaster負責具體應用程式的調度和協調。對於所有的應用,ResourceManager擁有絕對的控制權和對資源的分配權。而每個ApplicationMaster都會與ResourceManager協商資源,同時與NodeManager通信來執行和監控Task。如下圖

ResourceManager

ResourceManager為YARN的資源管理系統,負責整個集群的資源管理和分配。NodeManager以心跳的方式向ResourceManager匯報資源使用情況(主要是CPU和內存的使用情況)。ResourceManager只接收NodeManager的資源匯報信息,對於具體的資源處理則交給NodeManager自己處理。

NodeManager

NodeManager負責具體節點的資源管理和任務分配,它是ResourceManager管理該節點的代理,負責該節點程序的運行和資源的管理與監控。YARN集群的每個節點都運行一個NodeManager。NodeManager定時向ResourceManager匯報本節點資源(CPU和內存等)的使用情況和Container的運行狀態。當主ResourceManager宕機時,NodeManager會自動連接ResourceManager的備用節點。NodeManager接收並處理來自ApplicationMaster的Container啟動、停止等各種請求。

ApplicationMaster

用戶提交的每個應用程式均包含一個ApplicationMaster,它可以運行在ResourceManager以外的機器上。ApplicationMaster的主要職責如下。

(1)申請資源:與ResourceManager協商來獲取系統資源,ResourceManager以Container的形式將資源分配給ApplicationMaster。

(2)啟停任務:與NodeManager通信來啟動或停止任務。

(3)監控任務:監控任務的運行狀態。

(4)錯誤重試:在任務運行失敗後,ApplicationMaster會重新為任務申請資源並重啟該任務。ResourceManager只負責監控ApplicationMaster,並在ApplicationMaster運行失敗後啟動ApplicationMaster。ResourceManager不負責ApplicationMaster內部任務的容錯,任務的容錯由ApplicationMaster自身完成。

Container

Container是YARN所管理的集群中資源的抽象,它封裝了每個節點上的資源信息(例如,內存、CPU、磁碟、網絡等)。在ApplicationMaster向ResourceManager申請資源時,ResourceManager為ApplicationMaster返回的資源便是以Container表示的。YARN會為每個任務都分配一個Container,且該任務只能使用該Container中描述的資源,以達到資源隔離的目的。

YARN的任務提交和運行流程

YARN的任務提交由Client向ResourceManager發起,然後由ResourceManager啟動ApplicationMaster並為其分配用於運行作業的Container資源,ApplicationMaster在收到Container資源列表後初始化Container再交由NodeManager來啟動Container容器並運行具體的任務(MapReduce任務或其他Spark、Flink任務)。在任務運行完成後,ApplicationMaster向ResourceManager註銷自己並釋放資源。如下圖

(1)Client向ResourceManager提交應用程式,其中包括啟動該應用程式ApplicationMaster的必需信息,例如,ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等。(2)ResourceManager啟動一個Container,在容器中啟動和運行ApplicationMaster。(3)啟動中的ApplicationMaster向ResourceManager註冊自己,啟動成功後與ResourceManager保持心跳。

(4)ApplicationMaster向ResourceManager發送請求,申請相應數量的Container。

(5)ResourceManager返回ApplicationMaster申請的Container信息。

(6)申請成功的Container由ApplicationMaster進行初始化。

(7)在Container的啟動信息初始化後,ApplicationMaster與對應的NodeManager通信,要求NodeManager啟動Container。

(8)ApplicationMaster與NodeManager保持定時心跳,以便實時對在NodeManager上運行的任務進行監控和管理。

(9)Container在運行期間通過RPC協議向對應的ApplicationMaster匯報自己的進度和狀態等信息,ApplicationMaster對Container進行監控。

(10)在應用程式運行期間,Client通過RPC協議與ApplicationMaster通信獲取應用的狀態、進度更新等信息。

(11)在應用程式運行結束後,ApplicationMaster向ResourceManager註銷自己,並允許屬於它的Container被收回。

Hadoop的安裝和應用

Hadoop的安裝

(1)下載Hadoop安裝包:從Hadoop官網下載需要的Hadoop安裝包,這裡下載最新的3.2.0版本

(2)SSH配置:執行以下命令清單完成SSH配置。

(3)解壓安裝包:到下載目錄執行如下tar命令解壓安裝包

(4)hadoop-env.sh配置:進入Hadoop解壓目錄,執行以下命令,在hadoop-env.sh加入JDK的環境配置。

(5)core-site.xml配置:在Hadoop解壓目錄下執行以下命令,在core-site.xml中加入HDFS配置,指明HDFS集群中fs.defaultFS的服務地址為hdfs://localhost:9000。

(6)hdfs-site.xml配置:在Hadoop解壓目錄下執行以下命令,在hdfs-site.xml中加入HDSF文件的副本數,一般設置為3個副本即可。

(7)初始化HDFS NameNode:在Hadoop解壓目錄下執行namenode-format初始化NameNode。

(8)啟動HDFS:在Hadoop解壓目錄下執行sbin/start-dfs.sh啟動HDFS NameNode和DataNode。啟動命令執行成功後,Shell反饋如下。

在瀏覽器地址欄中輸入http://127.0.0.1:9870/查看HDFS Web頁面及整個HDFS集群的統計信息,注意新版本Hadoop在HDFS的默認埠號是9870

選擇Datanodes選項卡,可以看到DataNode的詳細信息


(9)建立HDFS文件目錄:執行dfs-mkdir命令建立HDFS文件系統目錄。

文件目錄建立後,在HDFS Web管理頁面的Utilities的Browse the file system下能看到剛剛建立的文件

(10)寫入HDFS文件:執行dfs-put命令將磁碟文件寫入HDFS。下面的命令將etc/hadoop目錄下的所有文件都寫入HDFS的input目錄

(11)mapred-site.xml配置:在Hadoop解壓目錄下執行以下命令,在mapredsite.xml中配置mapreduce.framework.name的提交環境為YARN。

(12)yarn-site.xml配置:在Hadoop解壓目錄下執行以下命令,在yarn-site.xml中配置yarn.nodemanager.aux-services為mapreduce_shuffle。


(13)啟動YARN:在Hadoop解壓目錄下執行以下命令,啟動ResourceManager和NodeManager。

在瀏覽器地址欄中輸入http://127.0.0.1:8088/cluster/cluster,可以看到YARN的資源管理調度頁面

Hadoop的應用

MapReduce應用程式概述

MapReduce是Hadoop的分布式計算框架,MapReduce應用程式首先需要聲明數據的輸入和輸出路徑(一般為HDFS路徑),然後實現接口或抽象類的Map和Reduce函數,最後加上作業配置(Job Configuration),就完成了MapReduce應用程式的構建。應用程式由Hadoop的Job Client將作業(JAR包或可執行程序)和配置信息提交給集群Master上的JobTracker,JobTracker負責分發這些作業和配置信息給集群中的Slave,調度任務並監控它們的執行,同時提供狀態和診斷信息給Job Client。

MapReduce應用程式的輸入與輸出模型

MapReduce框架運行的基礎數據模型是<Key,Value>鍵值對,MapReduce框架要求作業的輸入為一組<Key,Value>鍵值對,經過Map()、Reduce()計算,輸出的結果也是一組<Key,Value>鍵值對,這兩組鍵值對的類型可能不同。MapReduce框架需要對Key和Value對應的類進行序列化操作,因此,這些類需要實現Hadoop提供的Writable接口。另外,為了方便MapReduce框架執行排序操作,Key類必須實現WritableComparable接口。一個MapReduce作業的輸入和輸出類型如下所示。


在IDEA下運行MapReduce WordCount程序

1)建立Hadoop工程:新建Hadoop工程,並將hadoop-3.1.2/share/hadoop下的JAR包在Modules中引入工程中


(2)建立Mapper類:在項目中定義WordcountMapper類,並繼承Mapper實現文件的分詞操作

(3)建立Reducer類:在項目中定義WordcountReducer類,並繼承Reducer實現文件單詞的統計操作。由於Map階段的Key是單詞,MapReduce框架會將單詞相同(Key相同)的數據分發到同一個Reduce函數處理,因此Reduce函數只需要將獲取的單詞直接count即可

(4)建立Driver類:在工程中新建WordcountDriver類,並按照如下代碼實現客戶端Driver程序的建立。

(5)程序運行和結果驗證:在IDEA編譯器中單擊右鍵運行main()函數即可。①查詢輸入數據:在Hadoop根目錄下輸入以下命令查詢輸入數據。可以看到在input目錄下有很多.xml、.xsl、.cfg文件

②查詢輸出結果:在Hadoop根目錄下輸入以下命令查詢計算結果的數據。可以看到計算結果在output/1561192213573中。

關鍵字: