基於對象存儲的離線大數據處理架構和應用實踐

運營增長 發佈 2020-06-15T09:32:03+00:00

熵簡科技大數據處理系統目前已經累計完成 3.7 PB 規模的大數據處理和分析,覆蓋了超 2000+ 數據源,涉及豐富的數據類型,如宏觀經濟數據、電商招聘等另類數據、研報新聞等文本類數據。

熵簡科技大數據處理系統目前已經累計完成 3.7 PB 規模的大數據處理和分析,覆蓋了超 2000+ 數據源,涉及豐富的數據類型,如宏觀經濟數據、電商招聘等另類數據、研報新聞等文本類數據。這背後,是一個處理PB級數據的離線大數據處理架構。

本文從離線大數據處理的數據存儲選型要點入手,詳細介紹如何構建一套基於對象存儲的離線大數據處理框架。同時,作為彩蛋,我們在文章最後一部分,介紹了基於該大數據處理框架的電商大數據實踐案例,該數據集已被廣泛應用於金融投資分析中。

作者信息

本文出自熵簡科技大數據團隊,團隊致力於構建高效率、低成本和低運維的大數據處理系統,為熵簡科技應用層及中台層各項服務提供海量的算力支持,涉及數據清洗、數據融合、數據核驗、數據建模等多種處理能力。

01

背景

隨著大數據時代的到來,當前網際網路上的各類數據急劇膨脹。龐雜的信息需要經過採集、清洗、分析才能形成真正有價值的信息,從而為各類機構和企業提供決策依據。作為一家提供全域數據智能服務的科技公司,熵簡科技的數據服務是所有業務的核心基礎。


目前,熵簡科技大數據處理系統已經累計完成 3.7 PB 規模的大數據處理和分析,覆蓋了超 2000+ 數據源,涉及豐富的數據類型,如宏觀經濟數據、電商招聘等另類數據、研報新聞等文本類數據。


這背後,是一個處理PB級數據的離線大數據處理架構。在這個PB級離線數據處理的架構中,最重要的兩個部分便是數據存儲和計算引擎,各自的核心需求如下:


1.1 數據存儲


原始數據是來自於網際網路的海量數據,其特點是數據量大,並且隨著時間的推移不斷的膨脹。


很多原始數據是只有在特定的時間內才能採集到的,如果丟失將不能進行恢復,是一個不可挽回的損失。


因此,數據存儲的要求要保證:


1、大量數據的存儲(月增幅TB級)

2、強大的IO能力

3、支持增量數據的追加寫入和讀取

4、穩定可靠


1.2 計算引擎


數據處理需要應對的事情是面對不斷膨脹的數據不僅僅需要進行新增數據的處理,還要面臨可能因為算法調整需要重新計算所有歷史數據的情況。


因此,數據處理要求保證:


1、大量數據處理的能力(PB級別)

2、高效的數據處理能力(月度或者日度的數據更新需要在小時級別處理完畢)

3、穩定、魯棒性高,在出錯的情況下可以迅速進行調整和步驟級別恢復

4、開發和學習成本低

02

數據存儲技術選型

2.1 對象存儲介紹


對象存儲是一種可以水平擴展的分布式數據存儲,其中每個數據單元存儲稱為一個對象,實際上可以是任何類型和任何大小的數據。對象存儲中的所有對象都存儲在單個平面地址空間中,而沒有文件夾層次結構。


當前市面上最有名的對象存儲非 AWS S3 莫屬,阿里雲也提供了類似的存儲服務 Aliyun OSS。其特點就是高擴展性(幾乎無限的存儲空間),可用性和持久性(11個9的持久性保證),高性能,低成本,豐富的支持(Hadoop s3a等)。基於雲服務商提供的對象存儲服務,使用 RESTful API 可以在網際網路任何位置存儲和訪問數據,同時也能選擇多種存儲類型以全面優化存儲成本。


除了雲服務商提供的對象存儲服務之外,開源社區也提供了對象存儲服務,MinIO 是其中的佼佼者。MinIO 是一個基於 Apache License v2.0 開源協議的對象存儲服務。它兼容 AWS S3 雲存儲服務接口,非常適合於存儲大容量非結構化的數據,例如圖片、視頻、日誌文件、備份數據和容器/虛擬機鏡像等,一個對象文件可以是任意大小,從數 KB 到最大 5TB 不等。


豐富的雲廠商支持和開源社區支持,保證了使用對象存儲作為存儲介質具有了私有化部署的可能,也為 ToB 的大數據服務提供了更多的技術方案。


2.2 存儲成本對比


與文件類型和塊類型存儲不同,對象存儲沒有歸檔層次結構,並且元數據完全可自定義,因此與文件或塊存儲相比,硬體限制要少得多;加上對象存儲的橫向擴展性,其存儲成本遠遠低於其他類型的存儲。


我們以阿里雲上的 OSS、Table Storage、文件存儲NAS、普通雲盤四類存儲服務為例,對比一下各自的價格:


註:以上數據均來自阿里雲的各項存儲服務的公開標價。


從上表中可以看出來,對象存儲無疑是性價比之王,其標準存儲的價格僅為普通雲盤的一半。而雲服務商提供的低頻存儲和歸檔存儲類型,價格更低(一般為三分之一的價格)。存儲類型可以方便的切換和靈活的配置,大大降低了成本。


2.3 數據管道


在對大數據清洗的過程中,數據管道通常指數據來源到計算引擎之前的數據傳遞渠道,主要負責給計算引擎提供數據以及保存計算結果。由於採集系統或上游系統每時每刻都在產生大量的數據,計算引擎讀取數據的速度一定要高於上游系統的寫入速度。以 Apache Spark 為例,Spark應用的耗時有很大一部分花在讀取數據上,因此一個高效的數據處理流程,對數據管道的要求也是極高,數據管道需要極佳的 IO 性能。在做大數據離線清洗中,常見的數據源包括關係型資料庫、Kafka 和對象存儲。

關係型資料庫


關係型資料庫無法應付大量的非結構化數據,大規模的存儲和高並發訪問會導致資料庫負載過高最終產生性能瓶頸。


實踐中我們嘗試過 2TB 的 RDS 資料庫作為數據管道,一旦超過千萬級的數據量,讀取整張表將要花費近小時級的時間,而同樣的讀取在對象存儲中只需要數分鐘。這樣的差距放在 10 億級別的數據量上更是無法接受的。

Kafka 等消息隊列


Kafka 在大數據量方面上則占據了優勢,高吞吐量,高並發讀寫,無疑是流式數據處理中絕佳的選擇,但是對於離線大數據處理,其最大的問題是存儲成本高、運維成本高、且不適合保存歷史冷數據。

對象存儲


使用對象存儲作為中間數據管道,可以帶來很多好處。


從存儲成本分析,對象存儲在存儲方式上可按照對象訪問量、創建時間來區分存儲類型,可以很方便地將歷史數據做降級存儲,有效降低存儲成本。


從並發量分析,對象存儲的並發性能極佳,特別是使用雲服務商的對象存儲服務時,例如阿里雲的 OSS、AWS 的 S3 等,不需要考慮並發量的上限。


從運維上考慮,無論是開源的 MinIO,或者是雲服務商提供的對象存儲服務,運維都極其簡單,人力成本幾乎為零。


此外,使用對象存儲可以將結構化數據和非結構化數據採用統一的方式存儲,避免了在使用資料庫類解決方案時,文本、音視頻等文件需要單獨存儲的痛點。


但對象存儲同時也帶來了一些其他問題。例如對增量數據的讀取方式,沒有 Kafka 的指針、RDS 的自增 ID 作為原生支持。但是我們可以通過對象存儲的key值的特殊設計來解決這一問題,下文的具體方案中會有詳細闡述。


2.4 中間存儲


在數據清洗過程中為了保證調試的魯棒性和效率的提升,我們一般會在重要節點存儲中間數據,例如非結構化數據第一次變成結構化的時候,一般會存儲一份中間數據,保證後續的讀取足夠高效。


通常中間存儲使用的是 HDFS。HDFS 具有的特性是高可用性和高擴展性,而這些特性對象存儲全部都占有,甚至還有更強大的 IO 性能。此外由於 HDFS 用 NameNode 來存儲文件元數據和文件尋址方式的原因,HDFS 對於大量小文件的讀寫存在性能瓶頸;而對象存儲單獨存儲文件元數據,對於不同類型、不同大小的中間存儲都提供了強大的支持。


基於對象存儲的 Hadoop-AWS、Hadoop-OSS 等支持包解決了各計算引擎讀取數據的方式,可以直接通過類 HDFS 的接口進行文件操作,與 HDFS 可以無縫切換,加上其幾乎無需運維的特性,對象存儲是離線數據處理中間存儲的最佳選擇之一。

03

整體架構

我們以 Apache Spark + 對象存儲的流程為例,介紹基於對象存儲的離線大數據處理架構。


整個處理過程分為以下幾個主要流程:


1、Spider 通過網絡採集,獲取到數據後存儲到對象存儲中

2、通過 Airflow 的調度將數據清洗任務提交到 Spark cluster

3、清洗任務讀取對象存儲中的原始採集數據,並將中間數據輸出到對象存儲

4、再通過中間若干個數據清洗任務,最終將數據輸出到 ElasticSearch

5、前後端通過對 ElasticSearch 的數據查詢結果向客戶展示分析後的有效數據



3.1 對象存儲詳細方案


3.1.1 原始數據存儲


存儲格式


採集端使用 JSON 格式存儲、Snappy方式壓縮數據,目的在於壓縮存儲所需的資源,Snappy 對 JSON 數據的壓縮效果不錯,可以大大節省存儲空間,並且 Spark 讀取 Snappy 數據不需要任何改造。


以下為壓縮測試的一些結果:


存儲路徑管理


儘管對象存儲沒有天然的層次結構,但是我們可以將相同類型的數據對象賦予相同前綴的Key值,得到不同的存儲路徑(對象 Key 值前綴)。


存儲路徑按照不同的數據種類分目錄進行存儲,例如某網站的評論數據可以放在 web/comment/ 路徑下,使用不同的路徑區分不同的數據表,目的很簡單,根據路徑可以確定單表內容,保證 JSON 數據不進行混合。


分布式爬蟲可以生成 UUID 作為文件名放入當前時間的文件夾,並發寫入性能可以大大提升。


時間游標管理


在同一類型的數據存儲路徑下,採用數據入庫時間來確定具體對象 Key 值, 例如:web/comment/2019/01/01/00/02/00/ ,利用路徑的方式給數據增加了時間游標的功能,可以通過路徑進行增量遍歷。

文件大小


單文件大小控制在 64 MB 左右,避免過小的分片,數據進入對象存儲之前進行合併處理是一個優秀的處理方式。


3.1.2 中間數據存儲


存儲格式


中間存儲一律使用 Parquet 格式,Parquet 格式壓縮率很高,節省空間,Spark 支持很好,讀取效率極高,豐富的數據格式可以支持絕大部分的數據結構。

存儲路徑管理


按照路徑區分中間數據表例如:web/comment_analyzed,web 代表資料庫名,而 comment_analyzed 代表被分析過的中間數據。

時間游標管理


離線數據的清洗有比較強的批次性,在採集數據的時候就會確定好相應的批次。


例如,如果需要 2019年12月 的數據,那麼就會在原始數據中增加 "batch_date": "2019-12-01" 的批次標誌。


中間數據清洗後,依照 web/comment_analyzed/batch_date=2019-12-01/ 的方式進行分區,Hadoop-AWS、Hadoop-OSS 等多數基於對象存儲的依賴包都支持基於對象前綴的自動分區發現。

文件大小


控制中間輸出文件的大小為 64 MB 左右,不宜過小也不宜過大。


其原因在於使用 Spark 進行數據處理的時候會按照文件的大小作為初始單個 partition 處理數據的大小。


過小的文件會造成讀取的 overhead 過大,從而讀取緩慢,而過大的文件大小會導致需要的 executor 內存大小暴漲。


3.2 Spark任務開發詳細方案


時間游標管理


由於使用對象前綴標記存儲類型和入庫時間,數據清洗過程中需要開發讀取對象存儲時間格式路徑的基礎功能,我們接入了Mysql 作為元數據記錄,記錄每個任務當前處理到的時間節點信息。


例如:在2020年1月1日我們處理到時間游標為 web/comment/2020/01/01/00/02/00/的數據,則在資料庫中記錄 2020-01-01 00:01:00 這個時間。


2020年1月2日再次進行任務清洗的時候,則將先從數據讀取這個時間,並從 web/comment/2020/01/01/00/02/01/ 開始數據的讀取。

處理步驟的拆分


原始數據和中間存儲採用不同的格式存儲數據,讀寫性能也有較大的差距,為此需要區分採集數據的首次清洗和後續的數據清洗。


採集數據的首次清洗具有比較高的複雜度,由於是從 JSON 數據讀取,並且可能是龐大的非結構化數據,處理效率較低,所以首次處理一般會使用追加的方式將原始數據轉存到以批次時間進行分區的對象存儲路徑中,例如:

web/comment_analyzed/batch_date=2019-01-01。


後續的數據清洗是靈活的,可以按照任務的複雜性進行步驟拆分,每個步驟最終都會產生一個中間數據,可以大大地提高數據處理效率。

04

實踐案例

接下來,以某電商平台的全量數據分析為案例,詳細介紹如何運用本文所提出的架構,高效率地實現 TB 級別電商大數據的處理和分析。


在本案例中,我們需要對電商平台上的全部商品進行銷售情況分析,需要計算商品的月度銷售量、銷售額等指標,並按行業、品牌、屬性等做不同維度的聚合。


採集端提供的原始數據有如下幾類:


(1) 商品屬性數據


包含商品ID、行業名稱、品牌名稱,以及商品細分參數,例如規格、類型、產地、包裝等。


(2) 商品價格數據


包含商品原始價格、折扣價格、促銷信息。


(3) 商品銷售量數據


包含商品總銷量、月度銷量。


(4) 商品評論數據


包含商品總評論數、好評數、中評數、差評數等信息。


在金融投資領域,信息不對稱是這一行業的本質特徵和競爭焦點,因而數據的及時性非常重要。在大數據時代,快速利用和挖掘信息的能力越來越成為各個金融資管機構競爭的焦點。因此,對於每月的新增批次數據,我們需要保證在月初的一天內完成對於上月數據的處理和分析。


4.1 難點分析


1、數據處理時限短:如上所述,該電商平台同期在線商品SKU量在10多億。為了滿足數據及時性要求,每批次的數據需要在月初的一天內完成數據採集、數據處理。


2、數據量大:計算月度銷量等指標數據所需的基礎數據有:上月總銷量、本月總銷量、商品當月價格、商品屬性、商品所屬行業品牌等。每一種基礎數據的量級都在 10 億級別。存儲格式為多行 JSON 文件+ Snappy 壓縮,每批次數據的空間占用量接近 1TB。


4.2 實現方案


針對 4.1 中的難點,我們以分品牌、分行業、分屬性的月度銷售量數據為分析目標,詳細介紹其中的處理細節。


數據預處理


預處理是指將採集中的一部分增量數據進行實時處理,而非等到完全採集完畢才處理,這樣可以有效減少最終計算時處理的數據量和計算量。


可進行預處理的計算任務如下圖所示:



預處理實施分為如下幾個步驟:


1、每日將所有爬取到的增量數據由 JSON 轉為 Parquet。

2、對某些可進行預計算的數據進行計算並保存處理後的中間數據。


例如,對於價格部分我們採用了拉鍊表的計算方式。可以每日進行一次拉鍊表計算,每次處理增量數據,這樣就不用到月末時處理一整個月的數據。


對於銷量部分,我們需要按照採集時間將銷量數據分片,這個操作也可以放到每日進行。


這部分的任務,如果放在最終計算步驟,會額外增加約 6h 的處理時間。

銷量數據計算


有了數據預處理的基礎,我們開始進行月度銷量數據的計算。通過 Airflow 可以將複雜的計算邏輯以圖的形式展示出來,使得依賴關係變得清晰可控,整體的計算邏輯如下圖所示:



商品銷售數據的計算主要分為如圖幾個部分:


1、價格處理


此部分主要對價格進行拉鍊表轉換、月度平均價格計算等處理。


考慮到價格是會變化的,因此價格數據的採集粒度是日度,每月的價格數據加在一起有300億條以上。但價格並不是每天都在變,而且大部分商品的價格變化並不頻繁,因此採用了拉鍊表的存儲方式,來減少最終處理的數據量。


考慮到無法採集到每一個商品的成交價格,因此需要計算月度平均價格來做銷售額計算。


2、銷量處理


此部分主要對銷量數據按照採集時間來做一個分片存儲,方便後續計算月度銷量。


3、商品行業、品牌、屬性處理


此部分主要從採集數據中抽取到商品的行業、品牌等分類。以及對商品的各種參數進行歸整。


由於單條數據體積較大,在格式解析,欄位提取上會耗費不少時間。


4、商品月度銷售額計算

此部分計算商品的月度銷售數據。月度銷售量即為本次採集到的的銷售量減去上月採集到的銷售量。但由於採集時間並非嚴格的每月1號,因此需要將銷售量進行一些細微的調整拆分,以獲取月度準確銷售量。然後根據價格拉鍊表獲取對應月度的平均價格,計算出銷售額。

5、行業、品牌等月度銷售聚合

需要對全部商品數據進行行業、品牌、店鋪、屬性等各個維度的聚合。

6、指標清洗

此部分需要按行業、按品牌將歷史全部的月度銷量轉換為時序數據,方便業務人員使用。

如上所述,出於數據量以及性能優化的考慮,將整個計算任務進行了細緻的拆分,並加入中間數據緩存以及任務並行度提升的優化措施:

1、計算步驟拆分

一般計算過程會產生很多中間結果,中間結果的持久化會帶來很多好處。當程序崩潰時,通過中間結果可以以最小的代價復現崩潰,從而更快速的發現 BUG。

當修復 BUG 後,也可以通過讀取中間數據進行之後的計算,從而節省恢復時間。當計算結果核查有問題時,可通過查詢中間數據,快速定位到出問題的數據點。

2、任務並行化拆分

很多計算的結果,既需要保存到資料庫、也需要作為下一步計算的輸入。由於數據量巨大,在保存到業務端資料庫比如 MySQL、ElasticSearch 時是非常耗時的。

此時通過將計算結果持久化到對象存儲。然後在下游拆分多個任務同時進行計算和存儲,也可節省將近 4 小時的時間。

在整個實現方案中,中間數據的存儲起到了非常重要的作用。對象存儲的低成本、高 IO 效率使得大規模使用中間數據成為可能。

最終,整套計算方案採用100台機器(4核32G)可以在 16h 內完成 1TB 量級電商數據的處理和分析,完全滿足業務應用上對於數據實時性的要求。

05

總結

針對 PB 級異構數據源的離線處理和分析的需求,本文介紹了一種基於低成本對象存儲的大數據處理方案,該技術方案是一個低成本、低運維、低學習成本的架構方案,同時具備很高的可用性和擴展性。基於該方案,我們以某電商平台的月度全量數據為應用案例,詳細介紹了運用該框架進行銷量數據統計的處理細節和重要優化點。目前熵簡科技的數據量已經達到了 3700TB 左右,該方案可以持續穩定的提供數據清洗、數據分析和數據融合的能力。

作者:熵簡大數據團隊

關鍵字: