Spark原理及應用

勇者熱情生活家 發佈 2024-04-02T16:56:56.941270+00:00

Spark擁有Hadoop MapReduce所具有的優點,但不同於Hadoop MapReduce的是,Hadoop每次經過Job執行的中間結果都存儲到HDFS等磁碟上,而Spark的Job中間輸出結果可以保存在內存中,而不再需要讀寫HDFS。

Apache spark是通用的分布式大數據計算引擎。Spark是UC Berkeley AMPLab(美國加州大學伯克利分校的AMP實驗室)開源的通用並行框架。Spark擁有Hadoop MapReduce所具有的優點,但不同於Hadoop MapReduce的是,Hadoop每次經過Job執行的中間結果都存儲到HDFS等磁碟上,而Spark的Job中間輸出結果可以保存在內存中,而不再需要讀寫HDFS。因為內存的讀寫速度與磁碟的讀寫速度不在一個數量級上,所以Spark利用內存中的數據能更快速地完成數據的處理。Spark啟用了彈性分布式數據集(Resilient Distributed Dataset,RDD),除了能夠提高交互式查詢效率,還可以優化疊代器的工作負載。由於彈性分布式數據集的存在,使得數據挖掘與機器學習等需要疊代的MapReduce的算法更容易實現。

Spark的原理

Spark的特點

1.計算速度快

Spark將每個任務都構造成一個DAG(Directed Acyclic Graph,有向無環圖)來執行,其內部計算過程基於彈性分布式數據集在內存中對數據進行疊代計算,因此其運行效率很高。官方數據表明,如果計算的數據從磁碟上讀取,則Spark的速度是Hadoop MapReduce的10倍以上;如果計算的數據從內存中讀取,則Spark的計算速度是Hadoop MapReduce的100倍以上。

2.易於使用

Spark提供了80多個高級運算操作,支持豐富的算子,開發人員只需要按照其封裝好的API實現即可,不需要關心Spark的底層架構。同時,Spark支持多種語言開發,包括Java、Scala、Python。

3.通用大數據框架

Spark提供了多種類型的開發庫,包括Spark Core、Spark SQL(即時查詢)、Spark Streaming(實時流處理)、Spark MLlib、GraphX(圖計算),使得開發人員可以在同一個應用程式中無縫組合使用這些庫,而不用像傳統的大數據方案那樣將離線任務放在Hadoop MapReduce上運行,將實時流計算任務放在Storm上運行,並維護多個平台。Spark提供了從實時流計算、MapReduce離線計算、SQL計算、機器學習到圖計算的一站式整體解決方案。

4.支持多種資源管理器

Spark支持單機、Standalone、Hadoop YARN、Apache Mesos等多種資源管理器,用戶可以根據現有的大數據平台靈活地選擇運行模式。

5.Spark生態圈豐富

Spark生態圈以Spark Core為核心,支持從HDFS、S3、HBase等多種持久化層讀取數據。同時,Spark支持以Hadoop YARN、Apache Mesos和Standalone為資源管理器調度Job,完成Spark應用程式的計算。Spark應用程式可以基於不同的組件實現,如Spark Shell、Spark Submit、Spark Streaming、Spark SQL、BlinkDB(權衡查詢)、MLlib/MLbase(機器學習)、GraphX和SparkR(數學計算)等。Spark生態圈已經從大數據計算和數據挖掘擴展到機器學習、自然語言處理和語音識別等領域。

Spark的模塊

Spark基於Spark Core建立了Spark SQL、Spark Streaming、MLlib、GraphX、SparkR核心組件,基於不同組件可以實現不同的計算任務,這些計算任務的運行模式有:本地模式、獨立模式、Mesos模式、YARN模式。Spark任務的計算可以從HDFS、S3、Hypertable、HBase或Cassandra等多種數據源中存取數據。

Spark Core

Spark的核心功能實現包括基礎設施、存儲系統、調度系統和計算引擎。

(1)基礎設施:Spark中有很多基礎設施,這些基礎設施被Spark中的各種組件廣泛使用,包括SparkConf(配置信息)、SparkContext(Spark上下文)、Spark Rpc(遠程過程調用)、ListenerBus(事件總線)、MetricsSystem(度量系統)、SparkEnv(環境變量)等。

①SparkConf:SparkConf用於定義Spark應用程式的配置信息。

②SparkContext:SparkContext是Spark應用程式的入口,Spark應用程式的提交與執行離不開SparkContext。SparkContext隱藏了網絡通信、分布式部署、消息通信、存儲體系、計算引擎、度量系統、文件服務等內容,開發人員只需要使用SparkContext提供的API完成功能開發即可。

③Spark RPC:Spark組件之間的網絡通信依賴Spark RPC框架。Spark RPC基於Netty實現,使用中分同步和異步兩種方式。

④ListenerBus:ListenerBus即事件總線,主要用於SparkContext內部各組件之間的事件交互。ListenerBus屬於監聽者模式,採用異步調用的方式實現。

⑤MetricsSystem:MetricsSystem為度量系統,用於整個Spark集群中各個組件運行狀態的監控。度量系統由多種度量源和多種度量輸出組成。

⑥SparkEnv:SparkEnv為Spark的執行環境,SparkEnv內部封裝了RpcEnv(RPC環境)、序列化管理器、BroadcastManager(廣播管理器)、MapOutputTracker(Map任務輸出跟蹤器)、存儲系統、MetricsSystem(度量系統)、OutputCommitCoordinator(輸出提交協調器)等Spark程序運行所需要的基礎環境組件。

(2)存儲系統:Spark存儲系統用於管理Spark運行過程中依賴的數據的存儲方式和存儲位置。Spark存儲系統首先考慮在各節點的內存中存儲數據,當內存不足時會將數據存儲到磁碟上,這種內存優先的存儲策略使得Spark的計算性能無論在實時流計算還是在批量計算的場景下都表現很好。Spark的內存存儲空間和執行存儲空間之間的邊界可以靈活控制。

(3)調度系統:Spark調度系統主要由DAGScheduler和TaskScheduler組成。DAGScheduler負責創建Job、將DAG中的RDD劃分到不同Stage中、為Stage創建對應的Task、批量提交Task等。TaskScheduler負責按照FIFO(First Input First Output,先進先出)或者FAIR(公平調度)等調度算法對Task進行批量調度。

(4)計算引擎:計算引擎由內存管理器、任務管理器、Task、Shuffle管理器等組成。

Spark SQL

Spark SQL提供基於SQL的數據處理方式,使得分布式數據的處理變得更加簡單。此外,Spark提供了對Hive SQL的支持。

Spark Streaming

Spark Streaming提供流計算能力,支持Kafka、Flume、Kinesis和TCP等多種流式數據源。此外,Spark Streaming提供了基於時間窗口的批量流操作,用於對一定時間周期內的流數據執行批量處理。

GraphX

GraphX用於分布式圖計算。通過Pregel提供的API可以快速解決圖計算中的常見問題。

Spark MLlib

Spark MLlib為Spark的機器學習庫。Spark MLlib提供了統計、分類、回歸等多種機器學習算法的實現。其簡單易用的API接口降低了機器學習的門檻。

SparkR

SparkR是一個R語言包,提供了輕量級的基於R語言使用Spark的方式。SparkR實現了分布式的數據框,支持類似查詢、過濾及聚合的操作(類似R語言中的數據框包dplyr),使得基於R語言能夠更方便地處理大規模的數據集。同時,SparkR支持基於Spark MLlib進行機器學習。

Spark的運行原理

Spark的運行模式

Spark的運行模式主要包括Local模式、Standalone模式、On YARN、On Mesos和運行在AWS等公有雲平台上

Spark的集群架構

Spark的集群架構主要由Cluster Manager(管理器)、Worker(工作節點)、Executor(執行器)、Driver(驅動器)、Application(應用程式)5部分組成

(1)Cluster Manager:Spark集群管理器,主要用於整個集群資源的管理和分配。根據部署模式的不同,可以分為Local、Standalone、YARN、Mesos和AWS。

(2)Worker:Spark的工作節點,用於執行提交的任務。Worker的工作職責如下。

①通過註冊機制向Cluster Manager匯報自身的CPU和內存等資源使用信息。

②在Master的指示下創建並啟動Executor,Executor是真正的計算單元。

③將資源和任務進一步分配給Executor並運行。

④同步資源信息和Executor狀態信息給Cluster Manager。

(3)Executor:真正執行計算任務的組件,是某個Application運行在Worker上的一個進程。該進程負責Task的運行並且將運行的結果數據保存到內存或磁碟上。Task是運行在Executor上的任務單元,Spark應用程式最終被劃分為經過優化的多個Task的集合。

(4)Driver:Application的驅動程序,可以理解為驅動程序運行中的main()函數,Driver在運行過程中會創建SparkContext。Application通過Driver與Cluster Manager和Executor進行通信。Driver可以運行在Application上,也可以由Application提交給Cluster Manager,再由Cluster Manager安排Worker運行。Driver的主要職責如下。

①運行應用程式的main()函數。

②創建SparkContext。

③劃分RDD並生成DAG。

④構建Job並將每個Job都拆分為多個Task,這些Task的集合被稱為Stage。各個Stage相互獨立,由於Stage由多個Task構成,因此也被稱為Task Set。Job是由多個Task構建的並行計算任務,具體為Spark中的Action操作(例如collect、save等)。

⑤與Spark中的其他組件進行資源協調。

⑥生成並發送Task到Executor。

(5)Application:基於Spark API編寫的應用程式,其中包括實現Driver功能的代碼和在集群中多個節點上運行的Executor代碼。Application通過Spark API創建RDD、對RDD進行轉換、創建DAG、通過Driver將Application註冊到Cluster Manager。

Spark的運行流程

Spark的數據計算主要通過RDD的疊代完成,RDD是彈性分布式數據集,可以看作是對各種數據計算模型的統一抽象。在RDD的疊代計算過程中,其數據被分為多個分區並行計算,分區數量取決於應用程式設定的Partition數量,每個分區的數據都只會在一個Task上計算。所有分區可以在多個機器節點的Executor上並行執行。

(1)創建RDD對象,計算RDD之間的依賴關係,並將RDD生成一個DAG。

(2)DAGScheduler將DAG劃分為多個Stage,並將Stage對應的Task Set提交到集群管理中心。劃分Stage的一個主要依據是當前計算因子的輸入是否確定。如果確定,則將其分到同一個Stage中,避免多個Stage之間傳遞消息產生的系統資源開銷。

(3)TaskScheduler通過集群管理中心為每個Task都申請系統資源,並將Task提交到Worker。

(4)Worker的Executor執行具體的Task。

Spark的使用

Spark被廣泛應用於大數據行業的各個領域,包括實時流計算、歷史數據分析、機器學習、圖計算等。本節將從Spark的安裝、Spark RDD的使用、Spark Streaming的使用和Spark SQL的使用等方面來介紹Spark各個組件的特性。

Spark的安裝

這裡以Linux系統單機版為例介紹Spark的安裝方式,具體步驟如下。

(1)到官網下載最新的Spark安裝包,注意這裡下載Spark編譯好的帶Hadoop的版本,即spark-2.4.3-bin-hadoop2.7.tgz。

(2)將安裝包複製到安裝目錄下,執行以下命令解壓Spark安裝包。

(3)配置系統的Spark環境變量。

①執行以下命令編譯profile文件。

②在proFile文件的最後加上以下內容來設置Spark環境變量。

③鍵盤按下「Esc」,輸入冒號「:」加「wq」保存,退出vim編輯模式。

④執行以下命令使文件修改立刻生效。

(4)新建spark-env.sh配置文件。

①執行以下命令,進入Spark的conf目錄。

②執行以下命令,根據Spark提供的spark-env.sh.template模板文件複製一份新的名為spark-env.sh的配置文件。

③執行以下命令打開spark-env.sh文件,輸入vim進入編輯模式。

④在spark-env.sh文件的最後加上以下內容。

⑤鍵盤按下「Esc」,輸入冒號「:」加「wq」保存,退出vim編輯模式。

(5)新建slaves配置文件。

①執行以下命令,進入Spark的conf目錄。

②執行以下命令,根據Spark提供的slaves.template模板文件複製一份新的名為slaves的配置文件。

③執行以下命令打開slaves文件,輸入vim進入編輯模式。

④在slaves文件的最後加上以下內容,表示在Localhost伺服器上有一個Slave角色。

⑤鍵盤按下「Esc」,輸入冒號「:」加「wq」保存,退出vim編輯模式。

(6)啟動Spark。

①執行以下命令,進入Spark的sbin目錄。

②執行以下命令,啟動Spark,在啟動過程中會要求輸入Linux的登錄密碼,按照提示輸入即可。

③在啟動後,控制台會列印出以下日誌,提示日誌文件的目錄。

④查看Master日誌:在Spark啟動後會看到Master的核心日誌如下。

⑤查看Worker日誌:在Spark啟動後會看到Worker的核心日誌如下。

⑥Jps查看進程。

⑦在瀏覽器地址欄中輸入http://192.168.2.103:8080查看Master頁面。注意,192.168.2.103是筆者當前的伺服器IP位址

⑧在瀏覽器地址欄中輸入http://192.168.2.103:8081查看Worker頁面

(7)執行Spark默認example:進入Spark安裝目錄,執行以下命令,啟動Spark示例中的SparkPi任務。

Spark RDD的使用

1.RDD的介紹

RDD是Spark中最基本的數據抽象,代表一個不可變、可分區、元素可並行計算的集合。RDD具有自動容錯、位置感知性調度和可伸縮等特點。RDD允許用戶在執行多個查詢時顯式地將數據集緩存在內存中,後續查詢能夠重用該數據集,這極大地提升了查詢效率。

2.RDD的核心結構及概念

(1)Partition:RDD內部的數據集在邏輯上和物理上都被劃分為多個分區(Partition)以提高運行的效率,分區數量決定了計算的並行度,每一個分區內的數據都在一個單獨的任務中被執行,如果在計算過程中沒有指定分區數,那麼Spark會採用默認分區數量。默認分區數量為程序運行分配到的CPU核數

(2)Partitioner:Partitioner是RDD的分區函數。分區函數不但決定了RDD本身的分區數量,也決定了其父RDD Shuffle輸出時的分區數量。Spark實現了基於Hash(HashPartitioner)和基於範圍(RangePartitioner)的兩種分區函數。

注意:只有對於Key-Value的RDD才會有Partitioner,而非Key-Value的RDD的Parititioner值是None。

(3)RDD的依賴關係:RDD的每次轉換都會生成一個新的RDD,因此RDD之間會有前後依賴關係。當在計算過程中出現異常情況導致部分分區數據丟失時,Spark可以通過依賴關係從父RDD中重新計算丟失的分區數據,而不需要對RDD上的所有分區全部重新計算。RDD的依賴分為窄依賴和寬依賴。

◎窄依賴:如果父RDD的每個分區最多只能被子RDD的一個分區使用,則稱之為窄依賴。

◎寬依賴:如果父RDD的每個分區都可以被子RDD的多個分區使用,則稱之為寬依賴。

窄依賴的每個子RDD的Partition的生成操作都是可以並行的,而寬依賴則需要所有父Partition Shuffle結果完成後再被執行

4)Stage:Stage是由一組RDD組成的可進行優化的執行計劃。如果RDD的依賴關係為窄依賴,則可放在同一個Stage中運行;若RDD的依賴關係為寬依賴,則要劃分到不同Stage中。這樣,當Spark執行作業時,會按照Stage劃分不同的RDD,生成一個完整的最優的執行計劃,使每個Stage內的RDD都儘可能在各個節點上並行地被執行

(5)PreferredLocation:PreferredLocation是一個用於存儲每個Partition的優先位置的列表。對於每個HDFS文件來說,這個列表保存的是每個Partition所在的塊的位置,也就是該HDFS文件的「劃分點」。

(6)CheckPoint:CheckPoint是Spark提供的一種基於快照的緩存機制。當需要計算的RDD過多時,為了避免任務執行失敗後重新計算之前的RDD,可以對RDD做快照(CheckPoint)處理,檢查RDD是否被計算,並將結果持久化到磁碟或HDFS上。此外,Spark提供另一種緩存機制Cache,Cache緩存數據由Executor管理,當Executor消失時,Cache緩存的數據將被清除,而CheckPoint將數據保存到永久性磁碟或HDFS,當計算出現運行錯誤時,Job可以從CheckPoint點繼續計算。

3.創建一個RDD應用

在創建RDD應用前,首先需要創建一個Spark項目,下面以Java基於Maven的項目為例介紹RDD的創建。

(1)構建Maven項目。按照編譯器提示構建一個簡單的Maven項目,並打開pom.xml文件,將Spark依賴加入項目中。需要注意的是,Spark不再對Java 1.7進行維護,因此必須指明Maven源碼編譯和Target編譯均使用Java 1.8。

在pom.xml文件中加入Maven的編輯插件,具體代碼如下。

(2)新建RDD類。

通過上述代碼已經構建了一個簡單的Spark應用項目,在項目的Java目錄下新建一個名為RDDSimple的Java類,並在類中輸入下面代碼來構建一個簡單的RDD。

上述代碼定義了一個簡單的RDD,具體過程為:首先定義SparkConf實例conf,然後調用sc.parallelize方法將一個數組轉換為一組名為distDatardd的Spark RDD,最後通過調用distDatardd.reduce方法對distDatardd進行操作。上述代碼實現了對RDD中的數據進行加和操作然後輸出。

利用JavaSparkContext的parallelize方法將已經存在的一個集合轉換為RDD,集合中的數據將被複製到RDD並參與並行計算。並行集合的一個重要參數是分區數量(將數據集切割為多份)。Spark將為集群的每個分區都運行一個任務。一般希望集群中的每個CPU都有2~4個分區,這樣既能良好地利用CPU,又不至於任務太多導致任務阻塞等待。通常Spark會嘗試根據集群的CPU核數自動設置分區數量,也可以手動設置分區大小。設置代碼如下。

(3)打包和作業提交

在項目的根目錄下輸入如下命令對項目進行打包,打包後的程序在Target目錄下。

Spark的作業提交很簡單,只需要調用spark-submit命令指定主函數入口類並將編譯好的JAR包提交到集群即可,集群會自動為程序分配資源並執行。在提交的時候,首先需要通過--class指定Spark程序的入口,然後通過--master指定提交給哪個集群,最後跟上JAR包路徑即可。提交命令如下。

在作業提交後,通過控制台能看到程序輸出如下結果。

4.利用外部數據集生成RDD

Spark可以從Hadoop或者其他外部存儲系統創建RDD,包括本地文件系統、HDFS、Cassandra、HBase、S3等。Spark RDD支持多種文件格式,包括文本文件、SequenceFiles、JSON文件和任何其他Hadoop InputFormat。通過SparkContext的textFile方法讀取文本文件創建RDD的代碼如下。

在上述代碼中,textFile()方法的URI參數可以是本地文件、本地路徑、HDFS路徑、S3路徑等。如果該參數的輸入值是具體的文件,則Spark會讀取參數中的文件;如果是路徑,則Spark會讀取該路徑下的所有文件,並最終將其作為數據源加載到內存生成對應的RDD。

Spark加載數據的注意事項包括以下幾個方面。

(1)如果訪問的是本地文件路徑,則必須可以在工作節點上以相同路徑訪問該文件。一般做法是將數據文件遠程複製到所有工作節點對應的路徑下或使用共享文件系統實現。

(2)除了支持基於文件名的方式加載文件,Spark還支持基於目錄、壓縮文件和通配符的方式加載文件。例如,可以使用textFile("/my/directory")表示加載「/my/directory」路徑下的所有文件;使用textFile("/my/directory/*.txt")表示加載「/my/directory」路徑下所有以.txt為後綴名的文件;使用textFile("/my/directory/*.gz")表示加載並解壓「/my/directory」目錄下所有以.gz為後綴名的文件。

(3)Spark加載文件時可以設置分區數。Spark在默認情況下為每個文件塊都創建一個分區(HDFS中默認文件塊大小為128MB),也可以通過傳遞更大值來設置更多分區。需要注意的是,分區數不能小於文件塊的數量。

(4)除了加載文本文件,Spark的Java API還支持其他多種數據格式。

①wholeTextFiles:JavaSparkContext.wholeTextFiles允許客戶端程序讀取包含多個小文本文件的目錄,並將每個文件都以<文件名,內容>的鍵值對返回。該方法與textFile不同,textFile是將每個文件中的每行都作為一條記錄返回的。

②SequenceFiles:加載SequenceFiles需要使用SparkContext的sequenceFile[Key,Value]方法實現,其中,Key和Value是文件中鍵和值的類型。這些鍵值對對應的是Hadoop的Writable接口的子類,比如IntWritable和Text。

③Hadoop InputFormat:對於Hadoop InputFormat類型的數據,可以使用JavaSparkContext.hadoopRDD方法或JavaSparkContext.newAPIHadoopRDD加載並生成RDD。

④文件保存和序列化:JavaRDD.saveAsObjectFile方法和JavaSparkContext.objectFile方法採用Java默認序列化的方式將數據序列化並保存到RDD。同時,用戶可以在保存數據的時候,使用其他更高效的序列化方法(例如Avro、Kyro等)。

5.RDD的轉換和操作

RDD支持兩種類型的操作:轉換(Transformation)和操作(Action)。轉換指從現有RDD創建新RDD,操作指在RDD上運行計算並將計算結果返回驅動程序。例如,map是一個轉換:它將RDD的所有元素都調用map函數進行轉換處理,返回一個表示轉換結果的新RDD;reduce是一個操作:它將RDD的所有元素都調用reduce函數進行聚合操作,將最終計算結果返回驅動程序。表示聚合處理的函數還有reduceByKey、reduceBy等。

Spark中的所有轉換(Transformation)都是懶加載的,即不會立即執行轉換操作,而是先記錄RDD之間的轉換關係,僅當觸發操作(Action)時才會執行RDD的轉換操作,並將計算結果返回驅動程序。這種懶加載的設計使Spark能夠更加高效地運行。具體代碼如下。

上述代碼通過sc.textFile()方法定義了名為lines的RDD,此時文件並沒有加載到內存中,僅僅是指向文件的位置。通過lines.map()方法定義了名為lineLengths的map轉換,同樣由於懶加載機制,lineLengths不會立即執行計算。最終,當運行reduce操作時,Spark將RDD計算分解為不同Stage在不同機器上運行任務,每台機器都運行部分map數據集並將運行結果保存為本地的reduce,在各個節點都運算完成後將reduce結果返回driver程序並進行結果的合併。

6.RDD持久化的概念、級別和原則

Spark可以跨節點在內存中持久化RDD。當持久化RDD時,每個節點都會在內存中緩存計算後的分區數據,當其他操作需要使用該RDD時,可以直接重用該緩存數據,這使得之後的RDD計算速度更快(通常超過10倍)。緩存是疊代計算和交互式計算的關鍵。

應用程式可以使用persist()或cache()標記要緩存的RDD,當調用操作(Action)執行計算時,計算結果將被緩存在節點的內存中。Spark緩存具有容錯性,如果RDD的某個分區丟失,則該RDD將被自動重新計算。

每個持久化RDD都可以使用不同存儲級別進行存儲,Spark允許將數據集存儲在磁碟上或內存中。Spark將需要緩存的數據序列化為Java對象(序列化可以節省磁碟或內存空間),然後跨節點複製到其他節點上,以便其他節點重用該數據。Spark中緩存持久化級別是通過StorageLevel來設置的。具體代碼如下。

(1)Spark持久化的級別

①MEMORY_ONLY:使用未經過序列化的Java對象在內存中存儲RDD。當內存不夠時,將不會進行持久化;當下次需要該RDD時,再從源頭處重新計算。該策略是默認的持久化策略,當使用cache()時,使用的是該持久化策略。

②MEMORY_AND_DISK:使用未經過序列化的Java對象存儲RDD,優先嘗試將RDD保存在內存中。如果內存不夠,則會將RDD寫入磁碟文件;當下次需要該RDD時,從持久化的磁碟文件中讀取該RDD即可。

③MEMORY_ONLY_SER:MEMORY_ONLY_SER的含義與MEMORY_ONLY類似,唯一區別是MEMORY_ONLY_SER會將RDD中的數據進行序列化。在序列化過程中,RDD的每個Partition都將會被序列化成一個字節數組,這種方式更加節省內存,從而避免持久化的RDD占用過多內存導致JVM頻繁GC。

④MEMORY_AND_DISK_SER:MEMORY_AND_DISK_SER的含義與MEMORY_AND_DISK類似。唯一區別是MEMORY_AND_DISK_SER會將RDD中的數據進行序列化。在序列化過程中,RDD的每個Partition都會被序列化成一個字節數組。這種方式更加節省內存,從而避免持久化的RDD占用過多內存導致頻繁GC。

⑤DISK_ONLY:使用未序列化的Java對象將RDD全部寫入磁碟文件。

⑥MEMORY_ONLY_2和MEMORY_AND_DISK_2:對於上述任意一種持久化策略,如果加上後綴_2,代表的是將每個持久化的數據都複製一份副本,並將副本保存到其他節點上。這種基於副本的持久化機制主要用於容錯。假如某個節點掛掉,節點的內存或磁碟中的持久化數據丟失了,那麼後續對RDD計算時還可以使用該數據在其他節點上的副本。如果沒有副本,則只能將這些數據從頭重新計算一遍。

⑦OFF_HEAP:OFF_HEAP與MEMORY_ONLY_SER類似,但OFF_HEAP將數據存儲在堆外內存中。該參數需要Spark啟用堆外內存。

(2)持久化的原則

Spark提供了豐富的存儲級別,旨在通過不同存儲級別的設置實現內存和CPU的最佳使用,具體開發中該如何選擇持久化方案呢?以下為Spark官方提供的緩存持久化的選擇流程。

①如果RDD在默認存儲級別(MEMORY_ONLY)下運行良好,則建議使用MEMORY_ONLY。該級別是CPU效率最高的類型,基於CPU快速計算可以使RDD上的操作儘可能快地運行。

②如果系統顯示內存使用過高,則嘗試使用MEMORY_ONLY_SER,並選擇更快速的序列化庫,以加快序列化時間和節省對象的存儲空間。

③如果要快速恢復故障,則建議使用副本存儲級別。其他存儲級別需要通過重新計算丟失的數據來保障緩存的完整性,而副本存儲級別可以在其緩存對應的副本節點上直接執行任務,不用等待重新計算丟失的分區數據。

(3)刪除持久化緩存

Spark會自動監視每個節點上的緩存使用情況,並以LRU方式刪除舊的數據分區。如果想手動刪除RDD,則可通過RDD.unpersist()方法完成。

關鍵字: