Presto+Alluxio 加速 Iceberg 數據湖訪問

datafuntalk 發佈 2024-05-07T09:36:17.251715+00:00

早先是數據倉庫 data warehouse 即 Hive 數據倉庫,之後出現了 Hudi 和 Iceberg,有一些公司用 Presto 查詢 Kafka ,還有 Druid 等等。

導讀 本文將分享 Alluxio 社區和 Presto 社區在數據湖方面的一些工作,主要聚焦 Iceberg。

文章包括以下幾個部分:

1. Presto & Alluxio

2. Alluxio & Iceberg

3. 最佳實踐

4. 未來的工作


分享嘉賓|王北南博士 Alluxio 軟體工程師

編輯整理|唐洪超 敏捷雲

出品社區|DataFun


01

Presto & Alluxio

1. Presto Overview

Presto 是一個里程碑式的產品,它能夠讓我們很簡單的不需要數據的導入和導出,就可以使用標準的 SQL 來查詢數據湖倉上的數據。早先是數據倉庫 data warehouse 即 hive 數據倉庫,之後出現了 Hudi 和 Iceberg,有一些公司用 Presto 查詢 Kafka ,還有 Druid 等等。Druid 很快,但是可能對 Join 支持不好,可以用 Presto 直接查詢 Druid 一步到位,然後通過一些計算的 pushdown,能夠讓 Druid 中有些跑得比較困難的任務得到很好的運行。

Presto 中有一個概念叫做交互式的查詢,即在幾秒種最多幾分鐘返回一個結果。現實中很多人用 Presto 來做秒級查詢,即 subsecond 的查詢,一秒鐘返回結果,得出一些很快很高效的 dashboard。也有人用 Presto 來處理一些幾小時的 job,甚至用 Presto 來部分取代 ETL,通過 SQL 語句就能直接處理數據,簡單易用。Presto 處理的數據量為 PB 級,在日常的使用中,一般一個 Presto 集群,一天處理幾十個 PB 的數據,還是很容易的。當然,集群越多,處理的數據量也越大。

目前 Presto 有兩個開源的社區,一個是 prestodb,此社區主要是由 Facebook 領導的社區,包括 uber、Twitter,以及國內公司 TikTok,騰訊都有參與。

另一個社區是 trinodb,prestodb 分出去之後,新建的開源社區更加的活躍。此社區背後的商用公司叫 starburst,這個社區更加活躍,用戶會更多一些,但是 prestodb 背後的大廠多一些。

Presto 目前的使用場景有很多,很多數據科學家和數據工程師通過 SQL 查詢想要的數據;一些公司決策使用的 BI 工具,比如 tableau 和 zeppelin;公司決策需要報表和 dashboard,這些 query 可能需要在幾秒鐘快速地完成,將數據展示出來,比如廣告的轉化率和活躍用戶,這些數據需要實時或准實時的反饋出來;還有一個場景就是 A/B testing,因為它的好處就是很快,結果能夠很快的反饋回來;最後一個是 ETL,ETL 是很多公司的數據倉庫或者數據平台的基石,非常重要,但是 Presto 並不是特別適合在這個領域,雖然很多人使用 Presto 來處理一些 ETL 的 job,但是 Presto 並不是一個很容錯的系統,如果計算過程中間壞掉,整個查詢可能就要從頭開始了。

下圖展示了 Presto 發展的歷史。

2. Presto 主體架構

上圖是 Presto 的主體架構,coordinator 如同一個 master,負責調度,當有一個查詢進來時,把 SQL 解析生成查詢的 plan,然後根據 plan 分發給若干個 worker 執行。根據不同的運算性質,每個 worker 去查對應的數據源,數據源可能是 Hive 數倉,也可能是數據湖 Iceberg 或者 Hudi,不同的數據源對應不同的 connector。connector 在使用的時候,其實在 Presto 里就像一個 catalog 一個 namespace。比如在 SQL 中查詢 Hive 數據倉庫中的部門表,通過 hive.ADS.tablename 就可以把這個 table 找到。

由於 Presto 有著多個 connector 和 catalog,天生能夠提供數據的 federation,即聯合。可以在 Presto 中聯合不同的數據源,可以來自 Hive 、Iceberg 、Kafka 、Druid、mysql 等各式各樣的數據源,並把來自多個數據源的數據 join 到一起。Presto很靈活,如很多人還把 Hive 的表跟 Google 的 spreadsheet 表格 join 到一起。

目前 presto 主要的數據來源可能 95% 甚至 99% 是來自 Hive 。當然現在也有些變化了,由於數據湖的崛起,可能越來越多流量會轉向數據湖 Iceberg 和 Hudi。

3. Presto + Alluxio Overview

Presto 訪問數據源就是通過直連的方式,比如要訪問 HDFS 就連到 HDFS 上。有的公司可能數據源太多,可能有十幾個 HDFS 的集群,這時候 presto 需要一個統一的命名空間,此時 Presto 可以提供一個聯合,在物理的數據層上面提供一個抽象層,看起來就像是一個 cluster,然後在 Presto 中呈現出來的就是一個統一的命名空間,這個功能還是挺方便的。

4. Presto 與 Alluxio 結合

Presto 查數據並不是把數據給吃進來,而是訪問數據的原始的存儲,數據存儲在 HDFS 就訪問 HDFS,當 SQL 查詢進來後翻譯完,去到這個 Hive Metastore 中拿到元數據,通過元數據找到表數據存儲在哪個目錄中,將該目錄分開,然後讓每個 worker 讀取若干的文件去計算結果。在結合 Alluxio 的工作時,改變了緩存路徑。

其實在商用版本有更好的一個功能。可以不改變這個路徑,還是這個 S3 路徑,但它其實使用了本地的 Alluxio,當然這在我們資料庫中遇到一些麻煩,因為資料庫中 expert 文件裡邊是 hard code 而不是死的路徑,為緩存帶來了一些麻煩,我們通過轉換,讓本來是訪問原始數據的存儲,通過 election 變成訪問本地的數據源,得到提速的效果。

5. Co-located deployment

我們提出提供了另外一種部署的方式。我們把 Presto worker 和 Alluxio worker 部署在同一台物理機上。這樣保證了數據的本地性。確保數據加載到了 Presto worker 的本地。這裡 Presto DB 上有更精簡的實現方式 ,在 to local cache 項目中,有 local cache 實現數據的本地化,通過數據本地化省掉網絡傳輸。對於 Alluxio 就是 Co-located 的部署方式。它跟 HDFS 相比也省掉了一次網絡的傳輸。

6. Disaggregated deployment

國內很多公司使用數據一體機,將 Presto、Spark、HDFS、 ClickHouse 等都放到一起。針對這種情況,推薦的實現就是用 in memory 的 Lark show 的 local cache,會有非常好的提速,即 local cache 結合 Alluxio worker ,能有百分之四五十的提速。缺點在於這種實現需要使用很多的內存,數據緩存在內存中,通過 SSD 或者內存來給 HDD 或者慢速的 SSD 做一個提速。這種方式即 Alluxio worker 跟 Presto worker 捆綁到了一起,200 個 Presto worker節點,就需要 200 個 Alluxio worker,這種方式會導致拓展的時候可能出現問題。

所以當數據量特別巨大,且跨數據中心訪問的時候,更推薦分離式 disaggregated 的部署方式。

--

02

Alluxio & Iceberg



Hive 數據倉庫已經有十幾年的歷史了,但是一直存在著一些問題,對於一個表的 Schema 經常有多人的改動,且改動往往不按規律改,原來是簡單類型,改成了複雜類型,導致無法保證數據的一致性,如果一條 SQL 查詢兩年的數據,這個表很可能兩年中改了好幾次,可能很多列的類型也改了,名字也改了,甚至可能刪掉或者又加回來,這就會導致 Presto 報錯,即使 Spark 也很難在數據 Schema 修改的過程中做到完全兼容。這是各個計算引擎的通病。

其實最早我們討論 Iceberg 這個方案的時候,最想解決的就是 Schema 的升級變化問題,另外想解決的就是數據版本的一致性問題。眾所周知,數據可能中間會出錯,此時需要數據回滾從而查看上一個版本的數據,也可能要做一些 time travel 查指定時間版本的數據。有些數據是追加的,可以通過 partition 按時間來分區,通過 partition 查詢指定時間分區數據。有的數據集是快照數據集,數據後一天覆蓋前一天,歷史數據無法保留,而 Iceberg 能解決這個問題。

其實 Iceberg 並沒有提供一個新的數據存儲,它更多的是提供一個數據的組織方式。數據的存儲還是像 Hive 的數倉一樣,存在 parquet 或者 ORC 中,Iceberg 支持這兩種數據格式。

當然很多時候為了能使用 export table,我們會把一些原始的數據 CSV 或者其他格式導進來變成一個 expert table,根據分區重新組織寫入 parquet 或者 ORC 文件。

關於 Schema 的 evolution 是一個痛點,Presto 支持讀和寫,但是目前用 Presto 寫 Iceberg 的不多,主要還是用 Presto 讀,用 Spark 來寫,這給我們的 Alluxio to Iceberg 結合造成了一定的麻煩。

1. Alluxio + Iceberg Architecture 方案

  • 方案一:

所有的操作都通過 Alluxio 寫,Spark 和 Presto 將 Alluxio 作為一個底層存儲,從而充分保證數據的一致性。

弊端是,實施該方案的公司稍微大了之後,數據直接往 S3 或 HDFS 寫,不通過 Alluxio。

  • 方案二:

讀寫都通過 Alluxio,通過自動同步元數據,保證拿到最新數據,此方案基本可用,不過還需 Spark 社區、Iceberg 社區以及 Presto 社區繼續合作來把數據一致性做得更好。

--

03

最佳實踐

1. Iceberg Native Catalog

目前,與 cache 結合比較好的是使用 Iceberg native catalog,在 Iceberg 叫 Hadoop catalog,在 Presto 中叫 native catalog,如果使用最原始的 Hive catalog,則 table 的元數據,即 table 位置的數據是放在 Hive-Metastore 中,Presto 或者 Spark 訪問表的時候先去查詢 Hive-Metastore 獲取表的存儲路徑,然後通過 Iceberg 將數據文件加載進來,但是實際上,table 會有變更,此時需要將 Hive-Metastore 上鎖,這種方案在只有一個 Hive-Metastore 的時候才有效,如果面臨多個 Hive-Metastore 會出現鎖失效的問題。

更好的一個方案是 Iceberg native catalog,即完全拋棄 Hive-Metastore,使用一個目錄來存儲這個 table 的列表,這個目錄可以在 HDFS 上或者 S3 上,我們更加推薦 HDFS,因為 HDFS 效果好一些,一致性也強一些。這一方案避免了 Hive-Metastore service 本身的很多問題,如 scalability 、延時。此方案對 cache 也比較友好,不需要做一個 metadata 的 cache,而是直接 cache 存放 metadata 的目錄。

2. Iceberg Local Cache

Local Cache 的實現是 Presto DB 的 RaptorX 項目,是給 Hive connector 做 Local Cache,很容易就可以給 Iceberg connector 也來打開這個 Local Cache。相當於是 cache 了 parquet 的文件到 local 的 SSD 上,Prestoworker,worker 上的 SSD 其實本來是閒置的,通過它來緩存數據效果還是挺好的。它可以提速,但我們目前還沒有特別好的官方 benchmark。

目前只是對 worker 進行 cache,metadata coordinator 是不開的,打開的話可能會有數據一致性的問題。

3. 數據加密

早先 parquet 文件是不加密的,cache 了 parquet 文件,雖然不是明文,但只要你知道怎麼讀取這個 parquet 文件格式就能把所有數據讀取出來。其 magic number 原來是 pare 1 就代表第一個版本,現在增加了一個 magic number 即 pare 加密的版本,這個加密版本把一些加密的信和 metadata 存在 footer 裡邊,它可以選擇對一些 column 和配置進行加密。加密好後,數據便不再是明文的了,如果沒有對應的 key,就無法讀取出數據。

通過對 parquet 加密,我們不再需要第三方的加密,也不需要對整個文件加密,可以只對需要加密的一些數據進行加密,這個方案也解決了另外一個重要的問題,就是有的公司其實是整個文件來加密存放在 HDFS,然後 Presto 讀之前把它解密好,很多文件存儲系統就是存的時候是加密的。讀取的時候確實拿到的解密好的數據,當 Presto 再通過 Local Cache 緩存數據的時候,cache 里存儲還是明文數據,這破壞了數據加密的管理。但是採用 parquet 內部加密,local cache 就可以滿足數據加密的要求了。

4. 謂詞下推

Iceberg 通過謂詞下推(Predicate Pushdown)可以減少查詢的數據量。

原來 Presto 的暴力查詢,根據條件把符合條件的一條條數據挑出來,但是中間有優化。其實很多查詢條件可以直接 push 到 Iceberg,Iceberg 讀取文件的範圍就小了。

下面是一個 benchmark,可以看到沒有謂詞下推前掃到了 200 萬條記錄,CPU time 是 62 毫秒。謂詞下推後,掃到了一條記錄,查詢時間極大的縮短,這也是對緩存的一個優化。開謂詞下推(Predicate Pushdown)功能後,我們發現,緩存層次夠用,掃的文件少了很多,這意味著我們都可以緩存的下了,命中率有一個提高。

--

04

未來的工作

在前面的工作中我們發現系統的瓶頸在 CPU。此瓶體現在很多地方,其中很大一部分是對 parquet 文件的解析,parquet 文件解析任務太重了。由於 parquet 很節約資源,很難將 parquet 轉換為更好的格式。此時,一種解決方案是將數據分為冷熱數據,將較熱的數據轉換為更加輕量,序列化低的格式存到緩存中,通過實驗,將 parquet 文件反序列好的數據直接放到內存中,效率提升 8% 到 10% 。

但這有一個問題,此方案對 Java 的 GC 壓力非常大,因為緩存長時間存在。我們發現此方案並不是那麼好實施,所以我們更加想用 off heap 的方式,將數據存在 heap 之外。此時不能 cache object 本身,需要 cache Arrow 或者 flat buffer 格式,這兩種格式反序列成本極低,又是二進位的流存在內存中,通過 off heap 把它裝進來,然後在 Java 中再反序列化,這樣可以達到一個很好的提速效果。

另外我們也可以把一些算子 pushdown 到 native 實現存儲。比如說 Alluxio 再增加一些實現 native 的 worker 和客戶端的 cache 實現,我們將算子直接 pushdown 過去,就像前面 Iceberg pushdown 一樣,有些計算 push 到存儲,存儲返回來的結果特別少,它幫你計算,而且格式更好,它是 Arrow 並可以有 native 的實現,也可以向量化的計算。

Java 也能向量化計算。但問題在於 Java 的版本要求比較高,需要 Java16 或 17,而現在 Presto DB 還在 Java 11,trainer 倒是可以了,但是這個效果也不是特別好,因為 Presto 和 trainer 內存中的格式對性能化計算不友好,而且這個格式基本上是不能動的,如果要動,基本上全都要重新實現,這也是為什麼會有這個 vlogs 在那裡的原因。

可能這個 Presto 以後會有格式轉換,但是不在眼前,但是我們可以 off heap 的緩存,可以把這個 Arrow 緩存到 off heap 上,然後在那裡邊需要的時候把它拿出來。然後反序列化成 page,然後給 Presto 進行進一步的計算。這個開發正在進行,可能在將來會給大家展現一部分的工作。其實就是為了降低 CPU 的使用和系統的延時,降低 GC 的開銷,讓系統變得更加的穩定。

今天的分享就到這裡,謝謝大家。


|分享嘉賓|


|《數據智能知識地圖》下載|

上下滑動⬆️⬇️,查看《數據智能知識地圖》雲原生大數據模塊,完整版請關注公眾號「大話數智下載


|DataFun新媒體矩陣|


|關於DataFun|

專注於大數據、人工智慧技術應用的分享與交流。發起於2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公眾號 DataFunTalk 累計生產原創文章900+,百萬+閱讀,16萬+精準粉絲。

關鍵字: