spark原理和實踐

程序猿凱撒 發佈 2022-08-04T15:18:22.929234+00:00

一、大數據處理引擎Spark介紹1.大數據處理技術棧大數據的特性,數據是海量的,數據源是豐富多樣的,有消息,圖片,音視頻流,數據產生的非常快,需要快速處理,提高數據價值。

一、大數據處理引擎spark介紹

1.大數據處理技術棧

大數據的特性,數據是海量的,數據源是豐富多樣的,有消息,圖片,音視頻流,數據產生的非常快,需要快速處理,提高數據價值。 數據生成後,需要存儲元數據信息,選擇合適的存儲格式,像Parquet、ORC是兩種高性能的列式存儲,Hudi數據存儲的中間件,優化存儲的讀寫,也可以存儲到分布式文件存儲系統HDFS,分布式消息系統kafka,keyvalue分布式存儲的nosql引擎資料庫Hbase,基於列式存儲的分布式資料庫Kudu,字節提供的TOS,S3對象存儲。 存儲的數據需要計算才能使用,大數據的計算框架,Spark批式計算,Flink流式計算,Presto等處理在線分布式查詢場景的,是可交互式的OLAP引擎,計算框架藉助資源管理的編排調度工具YARN,K8S,來運行在分布式集群中處理存儲的數據。 計算處理存儲的數據後提供給上層應用,有BI報表,廣告,推薦,金融風控等。

2.常見大數據處理鏈路

資料庫中採集到的數據稱為數據源,存儲到分布式存儲系統中HDFS等,進行一系列的數據處理,會有多次數據讀取寫入,可以是各種存儲系統,也可以是各種資料庫,講處理的結果進行計算,再做各種應用。

3.開源大數據處理引擎

批式計算:Hadoop、Hive、Spark 流式計算:Flink OLAP:presto,ClickHouse,Impala,DORIS

MapReduce解決了Hadoop誕生,數據大規模處理數據,主流Spark是基於內存處理,對mapReduce進行了優化。

4.什麼是Spark?

Spark是用於大規模數據處理的統一分析引擎,是一種多語言引擎,可以用於單機節點或集群上來執行數據工程,數據科學和機器學習。

feature:

  1. 多語言選擇,用統一的方式處理流批的數據
  2. 可以用為儀錶盤執行快速的sql查詢分析,
  3. 適用於大規模的數據科學,對PB級別的數據來執行探索性的數據分析,對數據進行訓練建模預測。
  4. 機器學習,在單機上訓練機器學習的算法,可以很方便的拓展到大規模集群上

5.Spark版本演進

Spark生態&特點 -SparkCore 核心塊 基本功能:任務調度,內存管理 -SparkSQL,操作結構化數據的核心組件,直接查詢Hbase等各種各樣的數據源,可以進行交互式的查詢 -StruturedStreaming 流式計算框架,支持高吞吐的,可容錯處理的,實時流式數據 -MLlib,機器學習算法庫,分類,聚類,回歸等,模型評估, -GraphX分布式圖處理框架,提供圖計算,圖挖掘的一些API

  • 統一引擎,支持各種各樣分布式場景
  • 多語言支持
  • 支持豐富的數據源:內置DataSource,Text;Parquet/ORC;JSON/CSV;JSBC 自定義DataSource:Hbase/Mongo等
  • 豐富靈活的API/算子:RDD
  • 支持K8S/YARN/Meso資源調度

6.Spark運行架構&部署方式

集群管理器,負責管理整個集群,負責資源管理和調度,監控Woker節點, Worker從節點,負責控制計算節點 Deiver Program,是一個APP整個應用的管理者,負責作業的調度,是一個JVM進程,創建一個SparkContext上下文,控制整個應用的生命周期

Spark Local Mode 本地測試/單進程多線程模式

Spark Srandalone Mode 不需要藉助外部的資源調度管理, 需要啟動Spark的Sandalone集群的Master/Worker

依賴 YARN/K8S 依賴外部資源調度器 用master完成不同的部署方式,委託給誰資源管理

二、SparkCore原理解析

1.SparkCore

用戶選擇了集群提交到外部資源管理器,比如說提交到YARN,在mode management創建一個appmaster,用來管理資源,是否有資源創建Executor,APPmaster在YARN模式下相當於Driver也會通過DAG,Task Scheduler管理和分配Task。

Spark的數據輸入到輸出所有的數據結構都是基於RDD的,接下來從RDD開始說。

2.什麼是RDD

RDD是一個可以容錯的,並行執行的分布式數據集,最基本的數據處理模型。

  • 分區列表,每一個RDD都有多個分區,這些分區運行在集群不同節點上,每個分區都會被一個計算任務Task處理,分區決定了並行計算的數量。創建RDD可以指定分區個數,從集群創建的話,默認分區數是CPU核數;從HDFS文件存儲創建的話,Partition個數就是文件的blog數。
  • 都有一個計算函數,RDD以Partition為基本單位,每個RDD實現一個compute函數,對具體的RDD進行計算
  • 有依賴,每一個RDD都會依賴於RDD,每次轉換都會生成新的RDD,RDD會形成像Pipeline形成前後依賴關係。部分分區數據丟失時,Spark可以通過依賴關係重新計算分區數據,而不是對所有的RDD都進行重新計算
  • 實現了兩種類型分區函數,一個基於哈希的Hash partitioner,基於範圍的 range partitioner,兩種分區器。對於只有key-value的RDD,才會有分區partitioner,非key-value的RDD,partitioner的值是空的。Partitioner不但決定了RDD本身的分區數量,也決定了parent RDD shuffle時的shuffle分區數量。
  • 每個分區有一個優先的位置列表,他會存儲每個partition的優先位置,例如HDFS的文件會存儲每個partition塊的數據,移動數據不如移動計算,進行任務調度的時候,儘可能將計算分配到需要處理的任務塊的位置。

提供了各種各樣的算子,就是成員函數,map,filter返回新的RDD;count返回新的數據類型。cache,persist(緩存)當一個RDD被多次使用,這個RDD計算鏈路非常長,那麼計算結果就會非常珍貴。可以中間進行緩存,保存計算結果,這也是Spark速度快的原因,可以在內存中持久化緩存數據集。

如何創建RDD?

  • 有很多內置RDD
  • 自定義RDD
  • 有很多內置RDD
  • 自定義RDD

兩類RDD算子

  • Transform算子:生成一個新的RDD

比如map,filter,flatMap,groupByKey,reduceByKey.....

本來是一個Parallel的RDD,經過一個Map的操作後,變成了一個MapPartitionsRDD。

  • Action算子:觸發Job提交

比如collect、count、take、saveAsTextFile.....

本來是一個RDD,做了一個count,觸發了Job提交,返回了Long類型。take就是取前幾個元素,觸發了一個Job,返回了一個Array。

RDD依賴

  • 窄依賴:父RDD的partition至多對應一個子RDD的分區,但是子RDD可能有多個父RDD的分區。窄依賴分別有三種
OneToOneDependency,是一對一的,返回對應Partition的list
RangeDependency,inStart是父RDD起始的位置,outStart是子RDD起始的位置,length是range的長度,如果子RDD的partition的index在父RDD的range內,返回父RDD,返回父RDD的partition是子RDD的partition的index減去父RDD分區的range的起始,再加上子RDD分區range的起始
PruneDependency
  • 寬依賴(會產生Shuffle):父RDD的每個partition都可能對應多個子RDD分區,都會使用所有父RDD的多個分區,就相當於是one to many,當RDD做groupBy或Join操作時會產生寬依賴。

ShuffleDependency,Shuffle的產生是因為有寬依賴,寬依賴對應一個Shuffle的操作,運行過程中父RDD的分區,會傳入不同的子RDD分區中,中間就可能涉及到多個節點的數據傳輸。

如果子RDD故障,有可能一部分父RDD就可以覆蓋子RDD的計算,有時需要所有的父RDD進行重算,代價比較高。可以設置一個檢查點checkpoint,然後涉及容錯文件的系統工作,HDFS的檢查點,把這些數據寫入到檢查點上,做高可用的數據存儲,後面有節點宕機,數據丟失,可以從檢查點的RDD計算,不需要從頭到尾。

RDD執行流程

Job:RDD action算子觸發 Stage:依據寬依賴劃分 Task:Stage內執行單個partition任務 從後往前劃分,遇到一個寬依賴,劃分一個stage,遇到窄依賴就加入到這個stage,DAG最後一個階段生成的partition生成的task叫Result Task,其餘的satge叫ShuffleMapTask因為都有一個Shuffle操作。最後一個RDD partition的數量就決定了每個stage task的數量。

一個action算子怎麼去觸發Job,到如何調度?

調度器

  • 當一個RDD的算子創建之後,Sparkcontext就會根據RDD對象創建一個DAG有向無環圖,觸發job之後,將DAG提供給調度器,然後調度器根據ShuffleDependency分為不同的Stage,然後按照依賴順序調度Stage,為每個Stage生成TaskSet集合併分發到TaskScheduler。
  • 通過集群中的資源管理器,在K8S模式下是master,在YARN模式下是result manager根據調度算法(FIFO/FAIR)對多個TaskSet進行調度,對於調度到的TaskSet,會將Task調度(locality)到相關Executor上面執行。

3.內存管理

Executor內存主要有兩類:Storage、Execution

啟動Spark時,會設置一個spark.executor.memory參數,JVM的內存,堆內內存。 緩存RDD數據或者廣播數據,占用內存叫做存儲內存Storage Memory。在處理shuffle時占用的內存叫執行內存Execution Memory,剩餘的用戶自定義數據結構,還有一些Spark 內部的元數據,定義為User Memory。

前兩種內存可以互相借用,可以減少spill的操作,執行內存是不能夠被存儲內存所驅逐的,但執行內存需要內存時,可以驅逐被Storage借用的內存,直到達到一個規定的存儲內存的邊界。當雙方空間都不足時,都需要存儲到硬碟上。

為了進一步優化內存的使用,提高Shuffle的排序效率,Spark引入了堆外內存(。可以直接操作作業系統堆外內存,減少不必要的內存開銷,掃描回收等,管理難度低,誤差比較小。

多任務間內存分配

在一個Executor內,所有的task是共享內存的,UnifiedMemoryManager統一管理多個並發Task的內存分配,Executor下可以運行多個task,每個task至少要獲取1/2n的空間,如果不能滿足,任務就會被阻塞,直到有足夠的空間,任務才會被喚醒,n為當前Executor中正在並發運行的task數量。

Shuffle

map跟reduce之間數據處理重新分發的過程稱之為Shuffle,創建一個ShuffleManger,在shuffleRDD compute邏輯執行的時候會從,env裡面get實現具體函數。

SortShuffleManger

SortShuffleManger是Manager的一個實現方式,這個是兩個數據量表大的時候常用的方法。對數據進行排序,Spill,產生多個零時的Spill磁碟文件,SpillFile,在最後,零時文件會合併成一個磁碟文件,為這些磁碟文件加一個索引,會產生兩個文件,一個數據文件,一個索引文件。在下一個stage來的時候,在索引文件中找到partition所在的數據文件的位置,再在數據文件中找到數據。

External Shuffle Service

External Shuffle Service運行在主機上,管理這台主機Executor節點產生的shuffle數據,在YARN上就是NodeManager管理,只處理來自於map節點和reduce節點的請求,map節點會將Shuffle的文件路徑告訴shuffle service,reduce端去讀取數據的時候就會發送一個請求獲取stream id,再去獲取數據。

三、SparkSQL原理解析

1.SparkSQL執行過程

  • SQL Parse: 將SparkSQL字符串或DataFrame解析為一個抽象語法樹/AST,即Unresolved Logical Plan
  • Analysis:遍歷整個AST,並對AST上的每個節點進行數據類型的綁定以及函數綁定,然後根據元數據信息Catalog對數據表中的欄位進行解析。 利用Catalog信息將Unresolved Logical Plan解析成Analyzed Logical plan
  • Logical Optimization:該模塊是Catalyst的核心,主要分為RBO和CBO兩種優化策略,其中RBO是基於規則優化,CBO是基於代價優化。 利用一些規則將Analyzed Logical plan解析成Optimized Logic plan
  • Physical Planning: Logical plan是不能被spark執行的,這個過程是把Logic plan轉換為多個Physical plans物理執行計劃
  • CostModel: 主要根據過去的性能統計數據,選擇最佳的物理執行計劃(Selected Physical Plan)。
  • Code Generation: SQL邏輯生成Java字節碼

影響SparkSQL性能兩大技術:

  1. Optimizer:執行計劃的優化,目標是找出最優的執行計劃
  2. runtime:運行時優化,目標是在既定的執行計劃下儘可能快的執行完畢。

2. Catalyst優化

  1. Rule Based Optimizer(RBO): 基於規則優化,對語法樹進行一次遍歷,模式匹配能夠滿足特定規則的節點,再進行相應的等價轉換。
  2. Cost Based Optimizer(CBO): 基於代價優化,根據優化規則對關係表達式進行轉換,生成多個執行計劃,然後CBO會通過根據統計信息(Statistics)和代價模型(Cost Model)計算各種可能執行計劃的代價,從中選用COST最低的執行方案,作為實際運行方案。CBO依賴資料庫對象的統計信息,統計信息的準確與否會影響CBO做出最優的選擇。

3.AQE

AQE對於整體的Spark SQL的執行過程做了相應的調整和優化,它最大的亮點是可以根據已經完成的計劃結點真實且精確的執行統計結果來不停的反饋並重新優化剩下的執行計劃。

AQE框架三種優化場景(3.0版本):

  • 動態合併shuffle分區(Dynamically coalescing shuffle partitions)
  • 動態調整Join策略(Dynamically switching join strategies)
  • 動態優化數據傾斜Join(Dynamically optimizing skew joins)

處理的數據是比較大的,shuffle比較影響性能,這個算子比較費時,partition的個數非常關鍵,很難確定partition的數目應該是多少

4.RuntimeFilter

在join中形成filter算子進行優化。實現在Catalyst中。動態獲取Filter內容做相關優化,當我們將一張大表和一張小表等值連接時,我們可以從小表側收集一些統計信息,並在執行join前將其用於大表的掃描,進行分區修剪或數據過濾。

Runtime優化分兩類:

  1. 全局優化:從提升全局資源利用率、消除數據傾斜、降低IO等角度做優化。包括AQE。
  2. 局部優化:提高某個task的執行效率,主要從提高CPU與內存利用率的角度進行優化。依賴Codegen技術。

5.Codegen

前面是優化執行計劃,找出最優的執行計劃。另一種是在運行時優化,如何儘快的完成優化,提高全局利用率,消除數據傾斜,提高數據利用率。另一方面就是Codegen,局部優化,提高task的效率,從提高CPU與內存的利用率的角度來進行runtime優化。

  1. Expression級別

表達式常規遞歸求值語法樹。需要做很多類型匹配、虛函數調用、對象創建等額外邏輯,這些overhead遠超對表達式求值本身,為了消除這些overhead,Spark Codegen直接拼成求值表達式的java代碼並進行即時編譯

  1. WholeStage級別

傳統的火山模型:SQL經過解析會生成一顆查詢樹,查詢樹的每個節點為operator,火山模型把operator看成疊代器,每個疊代器提供一個next()接口。通過自頂向下的調用 next 接口,數據則自底向上的被拉取處理,火山模型的這種處理方式也稱為拉取執行模型,每個Operator 只要關心自己的處理邏輯即可,耦合性低。

火山模型問題:數據以行為單位進行處理,不利於CPU cache 發揮作用;每處理一行需要調用多次next() 函數,而next()為虛函數調用。會有大量類型轉換和虛函數調用。虛函數調用會導致CPU分支預測失敗,從而導致嚴重的性能回退

Spark WholestageCodegen:為了消除這些overhead,會為物理計劃生成類型確定的java代碼。並進行即時編譯和執行。

Codegen打破了Stage內部算子間的界限,拼出來跟原來的邏輯保持一致的裸的代碼(通常是一個大循環)然後把拼成的代碼編譯成可執行文件。

四、業界挑戰與實踐

1.Shuffle穩定問題

在大規模作業下,存在本地磁碟上,沒有備份,有大量的請求,數量級很大,有熱點數據反覆讀,spill數據會帶來寫放大,reduce高並發讀取小數據塊會帶來磁碟隨機訪問的問題,也是低效率的問題,NodeManager的模塊也會經常JC。 解決方案:

  • 數據遠端存儲
  • 在partition端shuffle的時候進行聚合,少了很多排序的操作

2.SQL執行性能問題

CPU流水線/分支預測/亂序執行/CPU緩存友好/SIMD/... 向量法/Codegen/兩者結合,例如Intel:OAP

3.參數推薦/作業診斷

Spark參數很多,參數不合理的作業,對資源利用率/Shuffle穩定性/性能有非常大影響。 自動化參數推薦/作業診斷——自動化

關鍵字: