B站增量數據湖探索與實踐

閃念基因 發佈 2023-02-08T14:38:06.800721+00:00

1. 背景眾所周知,越實時的數據越有價值。直播、推薦、審核等領域中有越來越多的場景需要近實時的數據來進行數據分析。我們在探索和實踐增量數據湖的過程中遇到許多痛點,如時效性、數據集成同步和批流一體的存儲介質不統一的問題。本文將介紹我們針對這些痛點所進行的思考與實踐方案。1.

1. 背景


眾所周知,越實時的數據越有價值。直播、推薦、審核等領域中有越來越多的場景需要近實時的數據來進行數據分析。我們在探索和實踐增量數據湖的過程中遇到許多痛點,如時效性、數據集成同步和批流一體的存儲介質不統一的問題。本文將介紹我們針對這些痛點所進行的思考與實踐方案。


1.1 時效性痛點


傳統數倉以小時/天級分區,數據完整才可查。然而,一些用戶並不需要數據完整,只需要最近的數據做一些趨勢分析。因此,現狀無法滿足用戶越來越強的數據時效性需求。傳統數倉ETL上一個任務完成後,才能開始下一個任務。即使是小時分區,層級處理越多,數據最終產出時效性越低。


1.2 數據集成同步痛點


業界成熟的方案是通過阿里巴巴 datax系統同步mysql 從庫數據到hive表。定期全量或者增量進行同步。需要單獨設置從庫以應對數據同步時對db請求的壓力。此外,db從庫成本高,不可忽視。

增量同步面臨如何解決歷史分區數據修改問題。如果一條數據被更新了,那麼僅通過增量同步,可能會在兩個分區里分別存在更新前和更新後的數據。用戶需要自行合併更新數據後,才能使用。


1.3 批流一體的存儲介質不統一


業務下游即有時效性要求場景,也有離線ETL場景。Flink sql可以統一流批計算過程,但無統一存儲,仍需要將實時、離線數據分開存儲在kafka、hdfs。



2. 思考與方案


增量化數據湖建設選型採用Flink + Hudi。我們需要數據湖的ACID 事務保障、流批讀寫操作的支持。並且,相對於 Iceberg 的設計,Hudi 對 Upsert 的支持設計之初的主要支持方案在upsert能力、小文件合併能力上有明顯優勢。Append性能在版本疊代中逐漸完善。活躍的社區在陸續疊代增量消費、流式消費能力。綜合對比,最終選擇基於Hudi搭建增量數據湖生態。



3. HUDI內核優化


Hudi cow模式每次寫入都需要進行合併,有io放大問題。Hudi 0.8起支持了mor模式,僅更新部分需要更新的數據文件。

但這會帶來數據質量問題,主要是一些極端情況下的數據丟失、數據重複、數據延遲問題。我們在實際測試生產過程中發現了一些問題,嘗試解決並反饋給社區。


3.1 底層數據可靠性優化


Hudi compaction大概代碼結構

StreamWriteOperatorCoordinator 算子中的notifycheckpointComplete邏輯:

  • 提交當前事務並將元數據deltacommit寫入hdfs
  • 調用調度生成compant 執行計劃並將compact.requested元數據寫入hdfs
  • 開啟下一次事務

CompactionPlanOperator 算子中的notifyCheckpointComplete邏輯:

  • 在0.9版本中通過sheduleCompation方法獲取最後一個compact執行計劃
  • 將compact.requested執行計劃轉化為CompactionPlanEvent下發下游

CompactFuncation

  • processElement 最後執行doCompaction 完成合併


3.1.1 log跳過壞塊bug,造成數據丟失

hudi log文件由一個個block 塊組成,一個log中可能包含多個deltacommit 寫入的塊信息,每個塊通過MAGIC進行分割,每個塊包含的內容如上圖所示。其中Block Header 會記錄每個塊的deltacommit instantTime,在compact 的過程中會掃描讀取需要合併的塊。HoodieLogFileReader是用來讀取log文件,會將log 文件轉化為一個個塊對象。首先會讀取MAGIC分隔,如果存在會讀下一位置塊的總大小,判斷是否超過當前文件總大小來界定是否為壞塊,檢測到壞塊會跳過壞塊內容並創建壞塊對象HoodieCorruptBlock,跳過的壞塊是從當前block total size 位置以後檢索MAGIC(#HUDI#),每次讀取1兆大小,一直讀取到下一個MAGIC分割為止,找到MAGIC後跳到當前MAGIC位置,後面可以接著在讀完整的塊信息。

上述提到壞塊的位置是從block total size 位置以後檢索的,當文件在極端情況下只寫入MAGIC內容還沒有寫入內容任務會出現異常,此時會寫入連續MAGIC,在讀取的過程中會把MAGIC當做Block Total size 讀取,在檢索的過程中會將下一個正常的塊給跳過,當成壞塊返回。在合併的過程中這部分塊數據就丟失了。這裡跳過的壞塊位置應該從MAGIC之後開始檢索而不是block total size之後,讀取時出現連續MAGIC就不會跳過正常塊。

相關pr:https://github.com/apache/hudi/pull/4015


3.1.2 compaction和回滾的log合併,導致數據重複


在HoodieMergedLogRecordScanner 會掃描有效的塊信息,極端情況下log 文件寫入的塊寫入是完整的,但是deltacommit元數據提交前任務失敗,並且log中的塊信息也比deltacommit元數據的instant Time 還小,此時會被認為是有效的塊被合併。因為deltacommit元數據沒有寫入成功,check point 重新啟動將之前的數據進行回放。回放時被分配的flieId可能不同,當這部分數據被合併會發現數據重複並且fileId不一樣現象。

在掃描log是否為效塊時,如果當前時間線比現有沒歸檔的時間線的元數據小的時候,加入已經歸檔的時間線,進行協同判斷是否為有效塊,不再只將未歸檔的時間線作為篩選條件。

相關pr:https://github.com/apache/hudi/pull/5052


3.1.3 數據非連續場景,最後一次數據不會觸發compaction,導致RO表數據延遲


上述流程中調度Compaction 執行計劃必須在上一次事務提交成功後才會觸發,如果一段數據都沒有數據寫入,compact 即使滿足時間條件或者commit個數條件都不會形成compact 執行計劃。上游沒有執行計劃下游Compaction 算子不會觸發合併,用戶在查詢ro表,部分數據會一直查詢不到。

去掉必須commitInstant 提交成功才調度生成compact執行計劃的綁定,每次checkpoint 後都檢查是否滿足條件觸發並生成Compaction 執行計劃,避免最新數據無法被合併。當沒有新數據寫入的場景下元數據只有deltacommit.requested和deltacommit.inflight元數據不能直接用當前時間為compact instantTime。上游可能隨時寫入數據避免合併沒有寫完的數據,在生成compact執行計劃也會檢查元數據的deltacommit 和compact 元數據避免出現合併沒寫完的數據。此時compact instantTime可以為最近沒完成的deltacommit instantTime 和最新完成compact 之間的時間。這樣生成compaction執行計劃元數據下游完成compact 合併就不會延遲了。

相關pr:https://github.com/apache/hudi/pull/3928


3.1.4 CompactionPlanOperator每次只取最後一次compaction,導致數據延遲


在StreamWriteOperatorCoordinator算子notifyCheckpointComplete方法中生產compact執行計劃,在CompactionPlanOperator 算子notifyCheckpointComplete方法中消費執行計劃下發操作合併數據。在極端情況下如果設置compact策略為一個commit就觸發compact合併操作,這樣在兩個算子notifyCheckpointComplete中會不斷的生產和消費compact 執行計劃,一旦消費一端的compact出現異常任務失敗,這樣會堆積很多的compact.requested執行計劃,而每次CompactionPlanOperator只會獲取一個執行計劃元數據,這樣數據會產生堆積和延遲,總有一部分執行計劃無法執行。

在0.8版本取最後一次的compaction執行計劃,這樣會新的commit一直在合併,老的數據一直無法合併造成丟失數據的假象。後續社區改為獲取最新的一次,如果下游出現某種原因的失敗導致compact執行計劃擠壓,數據延遲會越來越大。

CompactionPlanOperator獲取所有的compact 執行計劃轉化為CompactionPlanEvent下發下游,將CompactFunction 方法改為默認同步模式。異步模式中底層使用newSingleThreadExecutor線程池避免在同步的過程隊列持有大量對象。

相關pr:https://issues.apache.org/jira/browse/HUDI-3618


3.1.5 log中沒有符合時間線塊,parquet 文件重新生成,之前記錄丟失


compact 執行計劃包含多個HoodieCompactionOperation,每個HoodieCompactionOperation包含log文件和parquet 文件,但可能也只有logFlies 說明只有新增數據。在做compact 合併時會獲取HoodieCompactionOperation中的log文件和parquet 文件信息,構建HoodieMergedLogRecordScanner將log文件中符合合併要求的塊數據刷入ExternalSpillableMap中,在merge 階段根據parquet 數據和ExternalSpillableMap的數據比較合併形成新的parquet文件,新文件的instantTime為compact 的instantTime。

在HoodieMergedLogRecordScanner中掃描log file文件中,需要符合時間線要求的log 塊信息,如果沒符合要求的塊被掃描到,後續的merge操作不會運行,新的parquet 文件版本也沒有。下一次compact 的執行計劃獲取的FileSlice只會有log 文件而沒parquet文件,在執行compact runMerge 會當新增操作寫入,沒有和之前合併parquet數據合併之前的數據全部丟失,新parquet 文件中只有下一次log 產生的數據,導致數據丟失。

不論掃描是否符合要求,塊信息都強制寫入新的parquet文件,這樣下一次compact合併執行計劃中獲取FileSlice都會有log 文件和parquet文件,可以正常進行handleUpdate 合併,保證數據不丟失。


3.2 Table Service優化


3.2.1 獨立的table service -- compaction外掛


背景:

目前社區提供了多種表服務方案,但實際生產應用中,尤其是平台化過程中,會面臨多種問題。


原方案分析:

為了對比各方案的特點,我們基於hudi v0.9提供的表服務進行分析。

首先,我們將表服務拆解為調度+執行2階段的過程。

根據調度後是否立刻執行,可以將調度分為inline調度(即調度後立馬執行)與async調度(即調度後只生成plan,不立馬執行)兩種,

根據執行調用的方式,可分為sync執行(同步執行)、async執行(通過相應service異步執行,如AsyncCleanService)兩種,

此外,相對於寫入job,表服務作為一種數據編排job,本質上是區別於寫入job的,根據這些服務是否內嵌在寫入job中,我們將其稱為內嵌模式和standalone模式兩種模式。


以下,對社區提供的Flink on Hudi的多種表服務方案進行分析:

方案一 內嵌同步模式:在ingestion作業中,inline schedule + sync compact/cluster & inline schedule clean + async clean


該模式的問題很明顯:每次寫入後,立即通過內聯調度並執行compact作業,完成後才開始新的instant,在流程上即直接影響數據寫入的性能,在實際生產中不會採用。

方案二 內嵌異步模式:在ingestion作業中, async schedule + sync/async compact/cluster && inline schedule + sync/async clean


不同於方案一,該模式將資源消耗較大的compact/cluster等操作異步化至專門算子處理,ingestion流程僅保留了輕量的調度操作,對clean操作增加了同步/異步選擇。

但依舊存在缺點,ingestion作業的流式處理,疊加上表服務的間歇性批處理,對資源消耗曲線新增造成很多衝激毛刺,甚至是很多作業oom的元兇,使得作業配置時不得不預留足夠多資源,造成高優先資源閒時浪費。

方案三 standalone模式:ingestion作業 + compact/cluster作業組合

目前社區該方案尚不完善,其中寫入作業流程參考方案一,對compact/cluster/clean等action提供單獨的編排作業,以compact為例,HoodieFlinkCompactor的流程如下:


該方案通過抽象出單獨的數據編排作業,從作業級別隔離使用,克服了方案二的弊端,從平台化的角度看,符合我們的需求。

選定方向後,就需要面對該方案目前的諸多缺陷,包括如下幾點:

1. 在單writer模型下,編排作業的schedule模式不可用,會有的timeline一致性問題,導致數據丟失。

相對於ingestion作業,compact/cluster job本質上是另一個writer,多writer處於並發下,在無鎖狀態下timeline一致性是無法保證的,極端情況下會出現丟數據的問題,如下圖所示:

2. 寫入作業和編排作業沒有standalone模式下的協同能力。

首先,是clean action的問題,雖然hudi內核能力已經健全,但目前表服務層面僅暴露出inline調度+執行的方法,導致無論寫入還是編排作業都會包含clean,架構上過於混亂與不可控;

其次,編排作業CompactPlanSource與內嵌模式寫入作業的CompactPlan是兩個不同算子,dag未保持線性,不利於不同模式的切換;

此外,還存在作業編排調度不具備接收外部策略的能力,無法進行平台化,集成公司智能調度、專家診斷等系統等問題。


優化方案:

首先,解決timeline一致性問題,目前hudi社區已經有occ(樂觀並發控制)運行模式的支持,引入了分布式鎖(hive metastore、zookeeper)。但flink模塊的相關支持尚在初始階段中,我們內部也在進行相應應用測試,但發現距生產應用尚有諸多問題需要解決。

由於調度操作本身較為輕量,本期暫時把表服務scheduler保留在寫入作業中,仍舊保持以單writer模型運行,以規避多writer問題。

其次,針對性優化hudi底層的表服務調度機制,將clean action也拆解為調度+執行的的使用範式,通過inlineScheduleEnabled配置,默認為true進行後向兼容,在standalone模式下,inlineScheduleEnabled為false。

然後,重構寫入作業與編排作業,完善對3種運行模式的支持。具體包括:

對寫入作業的表服務scheduler優化,提供DynamicConfigProvider支持外部策略集成;重寫Clean算子,支持多種運行模式的切換;

對編排作業,重構作業使作業dag與內嵌模式線性一致;支持單instant,全批以及service常駐模式;優化寫入與編排作業間的配置傳遞,使其達到託管任務的要求任意啟停;

重構後的寫入與編排作業(以compact為例)如下:


3.2.2 metaStore解決分區ready問題


增量化數倉,需要支持近實時業務場景分鐘級數據可見性,需要在寫入數據時就創建hiveMetaStore分區信息。

而離線依賴建hiveMetaStore分區即數據完整的語義。如何解決是一個問題。

任務調度,依賴調度系統。在數據ready後,通知調度系統,可以進行下游任務調度。與建分區討論關係不大。


離線依賴hiveMetaStore問題,我們通過改造 hiveMetaStore ,賦予分區一個新的commit屬性,若數據未ready 則commit為false,分區不可見。保持原有語義不變。

對於adhoc來說,帶hint includeUnCommit=true標識查詢,可在數據未完成時查詢到數據。

對於離線來說,當分區的commit屬性被置為true,才能查到分區。滿足分區可見即數據完整的語義。

對於flink job來說,在數據第一次寫入時,創建分區,並賦予分區commit=false標籤,使得adhoc可以查到最新寫入的數據。

在處理完分區數據,判斷分區數據ready後,更新分區commit=true。此時,數據ready,分區對離線可見,滿足「分區可見即數據完整」的語義。



4. 場景落地實踐


4.1 增量化數倉


傳統數倉TL上一個任務完成後,才能開始下一個任務。即使是小時分區,層級處理越多,數據最終產出時效性越低。

採用增量計算方式,每次計算讀取上一次增量。這樣當上游數據完整後,只需要額外計算最後一次增量的時間即可完成,可以提升數據完成時效性。

同時在第一批數據寫入到ods層後就可增量計算至下一層直至產出,數據即可見,大大提升數據可見時效性。


具體實現方式是:通過hudi source,flink增量消費hudi數據。支持數倉跨層增量計算,如ods → dwd → ads 都使用hudi串聯。支持同其他數據源做join、groupby,最終產出繼續落hudi。

對於審核數據等有較高時效性訴求,可以採用此方案加速數據產出,提升數據可見時效性。


4.2 CDC到HUDI


4.2.1 面臨的問題


1. 原有datax同步方式 簡單來說就是 select * ,對mysql來說是慢查詢,有阻塞業務庫風險,所以需要單獨開闢mysql從庫滿足入倉需求,有較高的mysql從庫機器成本,是降本增效的對象之一。

2. 原有同步方式不能滿足日益增加的時效性需求,僅能支持天/小時同步,無法支持到分鐘級數據可見粒度。

3. 原有同步方式落hive表,不具備update能力,如果一條記錄經過update,則可能在兩個以mtime為時間分區都存在此數據,業務使用還需要做去重,使用成本較高。


4.2.2 解決思路


通過flink cdc消費mysql增量+全量數據,分chunk進行select,無需單獨為入倉開闢mysql從庫。

落hudi支持update、delete,相當於hudi表是mysql表的鏡像。

同時支持分鐘級可見,滿足業務時效性訴求。


4.2.3 整體架構


一個db庫用一個flink cdc job進行mysql數據同步,一張表的數據分流到一個kafka topic中,由一個flink job消費一個kafka topic,落到hudi表中。


4.2.4 數據質量保障 - 不丟不重


flink cdc source

簡單來說就是:全量 + 增量 通過changlog stream 方式將數據變更傳遞給下游。

全量階段:分chunk讀取, select * + binlog修正,無鎖的將全量數據讀出並傳遞給下游。

增量階段:偽裝成為一個從庫,讀取binlog數據傳遞給下游。

flink

通過flink checkpoint機制,將處理完成的數據位點記錄到checkpoint中,如果後續發生異常則從checkpoint可保證數據不丟不重。

kafka

kafka client開啟ack = all ,當所有副本都接收到數據後,才ack,保證數據不丟。

kafka server 保證replicas大於1,避免髒選舉。

這裡不會開啟kafka事務(成本較高),保證at least once 即可。由下游hudi做去重。

hudi sink

hudi sink同樣基於flink checkpoint實現類似二階段提交方式,實現數據寫hudi表不丟失。

通過增加由flink cdc生成的單調遞增的「版本欄位」進行比較,單條記錄版本高的寫入,低的捨棄。同時解決去重和亂序消費問題。


4.2.5 欄位變更


mysql業務庫進行了欄位變更、新增欄位怎麼辦?


面臨的問題

1. 數據平台新增欄位有安全准入問題,需要用戶確認,是否需要加密入倉。

2. 欄位類型變更,需要用戶確認下游任務是否兼容。

3. hudi的column evalution能力有限,比如int轉string類型就無法支持。

解決思路

與dba約定,部分變更支持自動審批(如新增欄位、int類型轉long等)通過。並且異步通知berserker(b站大數據平台系統),由berserker變更①hudi表信息,以及②更新flink job信息。

超出約定變更部分(如int轉varchar等),走人工審批,需要berserker確認① hudi表變更完成、②寫入hudi的flink job變更完成後,再放行mysql ddl變更工單。

我們改造了 Flink cdc job,可以感知mysql欄位變更,向下游kafka發送變更後數據,不受審批約束,將變更後的數據暫存在kafka topic中,此kafka對用戶不可見。下游寫入hudi任務不變更照常消費,不寫入新增欄位,用戶確認數據可入倉後,再從kafka回放數據,補充寫入新增欄位。

方案


mysql欄位類型和hudi類型存在不對應情況。flink job 消費kafka 定義欄位類型和 hudi 表定義欄位不對應,需要berserker在拼flink sql時候,額外拼入轉換的邏輯。

Flink cdc sql 自動感知欄位變更改造

flink cdc原生sql是需要定義mysql表的欄位信息的,那麼當mysql出現欄位變更時,是必然無法做到自動感知,並傳遞變更後數據給下游的。

原生flink cdc source會對所有監聽到的數據在反序列化時根據sql ddl定義做column轉換和解析,以row的形式傳給下游。

我們在cdc-source中新增了一種的format方式:changelog bytes序列化方式。該format在將數據反序列化時在不再進行column轉換和解析,而是將所有column直接轉換為changelog-json二進位傳輸,外層將該二進位數據直接封裝成row再傳給下游。對下游透明,下游hudi在消費kafka數據的時候可以直接通過changelog-json反序列化進行數據解析。

並且由於該改動減少了一次column的轉換和解析工作,通過實際測試下來發現除自動感知schema變更外還能提升1倍的吞吐。


4.3 實時DQC


dqc kafka監控存在幾個痛點:

  • 基於kafka的實時dqc很難做同比環比的指標判斷。
  • dqc實時鏈路是單獨的開發的,和離線dqc不通用。維護成本高。
  • 針對同一條流多個監控規則,是需要設立多個flink job,每個job計算一個指標。不能復用,資源開銷大。


架構


將kafka數據dump到hudi表後,提供dqc數據校驗。不影響生產秒級/亞秒級數據時效,又可以解決以上痛點。

hudi提供分鐘級的監控,可以滿足實時dqc監控訴求。時間過短,可能反而會因為數據波動產生誤告警。

hudi以hive表的形式呈現,使得實時dqc可以和離線dqc邏輯一致,可以很容易的進行同環比告警,易於開發維護。

實時DQC on Hudi 使得實時鏈路數據變得更易觀測。


4.4 實時物化


背景

有些業務方需要對實時產出的數據進行一個秒級的聚合查詢。

如實時看板需求,需要一分鐘一個數據點來展示DAU曲線,等多指標聚合查詢場景。

同時結果數據要寫入update存儲,實時更新。

難點

在較大數據規模下,基於明細產出幾十上百個聚合計算結果,要求秒級返回,幾乎不可能。

目前公司內支持update類型的存儲主要是redis/mysql,計算結果導入意味著數據出倉,脫離了hdfs存儲體系,同時也要使用對應的client進行查詢,開發成本較高。

現有的hdfs體系內計算加速方案如物化、預計算大都是基於離線場景,對實時數據提供物化查詢能力較弱。

目標

支持hdfs體系內的update存儲。讓數據無需出倉導入外部存儲,可以直接使用olap引擎高效查詢。

通過sql就可以簡單定義實時物化表。查詢時通過sql解析,命中物化表查詢則可秒級返回多個聚合查詢結果。

方案

基於flink + hudi提供實時物化的能力。

通過sql自定義物化邏輯到基於hudi的物化表。將明細數據寫入明細hudi表中,並拉起一個flink job 進行實時聚合計算,將計算結果upsert到物化的hudi表中。

在查詢時通過sql解析,如果規則命中物化表,則查詢物化表中的數據,從而達到加速查詢的效果。



5. 未來展望


5.1 HUDI內核能力增強及穩定性優化

Hudi timeline支持 樂觀鎖解決並發衝突,支持多流同時寫一張表。從底層支持新增數據和回補數據同時寫入hudi。

支持更豐富的schema evalution。避免重新建表、重新導入數據的繁重操作。

Hudi meta server,統一實時表離線表,支持instance版本等信息。支持flink sql上使用time travel,滿足取數據快照等訴求

Hudi manager 根據不同的表按需調配compaction、clustering、clean。用於離線ETL的表,低峰期進行compaction,資源上削峰填谷。對於近線分析的表,積極compaction以及clustering,減少查詢攝取文件數,提升查詢速度。


5.2 切換弱實時場景從Kafka到HUDI

在弱實時場景上實現流批統一存儲。Kafka對於突發流量以及拉取歷史數據達到性能瓶頸時,難易緊急擴容分攤讀寫負載。可以將分鐘級的弱實時使用場景,從Kafka切換到HUDI,利用HUDI可讀取增量數據的能力,滿足業務需求,並且HUDI基於分布式文件系統可快速擴容副本的能力,滿足緊急擴容的需求。

作者:

本期作者

周暉棟-嗶哩嗶哩資深開發工程師

目前主要負責B站實時團隊增量數倉、Hudi數據湖方向。

黃靖-嗶哩嗶哩資深開發工程師

專注於實時計算相關大數據技術,目前負責Hudi數據湖在B站的建設和應用。

陳世治-嗶哩嗶哩資深開發工程師

負責B站實時湖倉一體架構的建設,專注於Flink與Hudi生態結合的實時數據湖實踐與落地。

來源:微信公眾號:嗶哩嗶哩技術

出處:https://mp.weixin.qq.com/s/pjZAjgHF-HdZNjr7LfY9JA

關鍵字: