聯想基於Apache DolphinScheduler構建統一調度中心的應用實踐

datafuntalk 發佈 2024-04-09T09:53:19.239076+00:00

導讀 隨著業務不斷增長以及定時任務類型的多樣化,聯想內部需要一個統一的調度中心對任務生命周期進行管理。

導讀 隨著業務不斷增長以及定時任務類型的多樣化,聯想內部需要一個統一的調度中心對任務生命周期進行管理。Apache DolphinScheduler 是一個分布式、易擴展的可視化 DAG 工作流任務調度平台,致力於解決數據處理流程中錯綜複雜的依賴關係,使調度系統在數據處理流程中開箱即用。本次分享的主題是聯想基於 Apache DolphinScheduler 構建統一調度中心的應用實踐。

全文將圍繞以下四部分展開:

1. 背景需求

2. 為什麼選擇 Apache DolphinScheduler

3. Apache DolphinSheduler 在聯想的落地實踐

4. Apache DolphinSheduler 3.x 新特性 & Roadmap


分享嘉賓|李崗 聯想 資深數據架構優化工程師

編輯整理|張瑋

出品社區|DataFun


01

背景需求

在日常工作場景中,不管是後端開發還是數據分析,或是運維的工作中都會用到定時任務,比如,定時清理集群上某一個冗餘的日誌或定時執行數據備份,又或是發日報周報。在一開始任務量小的時候可以通過 crontab 或在 Spring 裡面開發定時任務,但當有幾百上千個定時任務時,就涉及到很多問題,比如,如何統一管理多個任務的生命周期以及任務間的相互依賴等,這時就需要一個統一的調度中心。

對於統一的調度中心,聯想內部當時收集了各個業務 Domain 的需求,主要分為三大類:

(1)支持任務的豐富性,除了日常需要的定時通知任務等功能,如日報周報、郵件通知等,也需要支持大數據領域的 ETL 任務和非大數據場景下的 HTTP 任務,具體包括數據集成、數據處理、調用 HTTP API 接口等。同時接口之間的請求參數也有依賴關係,這些都需要調度中心能夠支持。

(2)任務管理和編排,當任務越來越多,任務依賴關係變得越來越複雜時,就需要進行統一管理和編排,並對他們的執行情況做監控,看任務是否按照邏輯順序正確執行。此外一些在第三方業務系統里通過 Spring 開發的定時任務,也需要納入統一管理。

(3)關注 SLA,當任務量越來越大的時候要保證調度按時可靠地執行,並且任務的上下游依賴要能夠更快更實時的按序觸發。

通過對以上需求進行抽象分析,可以得出統一調度中心需要滿足以下六個特性:

➊ 高可靠性。穩定性是一個調度中心的重中之重,因此,調度系統要高可用,調度集群要支持分布式。

➋ 豐富易用。根據上面的需求,調度中心需要能夠支持多種任務,並且要對用戶足夠的友好,操作簡單。一方面可以基於 API 去調用,另一方面可以通過拖拉拽的方式方便數據分析人員使用。

➌ 輕量化。用於調度流程、啟動任務等的開銷要非常小。

➍ 業務隔離。當多個業務域接入到調度中心,需要對業務進行隔離,同時,任務執行時互不干擾。

➎ 調度性能線性擴展。當業務域加入到調度中心以後,任務的量級一定會指數級地增長。當任務量越來越大的時候,要能夠支持快速地線性擴展從而增加集群的能力。

➏ 業務易擴展。當有定製化的需求時,要能夠快速響應,這對調度中心的擴展性有很高的要求。

--

02

為什麼選擇 DolphinScheduler

根據聯想集團當時調研的需求以及背後的抽象,聯想 TDP 團隊對三個調度系統進行了調研,分別是 xxl-job、Apache DolphinScheduler、Apache Airflow。

1. 調度系統對比

如圖,是三個調度系統的對比。

定位上看,XXL-Job 是一個輕量級分布式的任務調度框架,在業務系統裡面需要去寫調度的邏輯,同時也能在 XXL-Job 上進行管理;Apache DolphinScheduler 是一個雲原生的分布式易擴展的可視化的工作流調度平台,致力於解決任務編排以及錯綜複雜的依賴關係;Airflow 跟 Dolphin 都是基於 DAG 的調度,但更偏重以編程的方式去編寫 DAG,然後對 DAG 進行調度和監控。

對任務類型的支持上看,XXL-Job 支持 Java、Shell、Python 這三種任務類型,支持的任務類型比較有限;Dolphin 和 Airflow 支持的任務類型相對較多,Dolphin 可以支持近 20 種業務類型,覆蓋大多數的業務場景;Airflow 也可以支持自定義地擴展。

對於可視化的能力,XXL-Job 可配置任務級聯觸發,但不支持 DAG;而 Dolphin-Scheduler 支持強大的 DAG 可視化拖拽,可以對其任務和依賴關係進行拖拽,並且 DolphinScheduler 在 2.x 也支持通過 Python 代碼去構建 DAG;Airflow 更多是通過 Python 代碼去構建 DAG。

對於調度中心擴展性的要求,當需要開發一些新的任務的時候,XXL-Job 需要通過 Java去開發執行器;DolphinScheduler 通過 SPI 實現自定義任務,更加容易擴展整個 DolphinScheduler 任務的支持能力;Airflow 也支持自定義任務類型,通過 Operator 來支持。在大數據這一塊對多租戶支持需求比較多,因為需要用到資源的調度,需要用到 Yarn 裡面的隊列。在 DolphinScheduler 裡面就會通過租戶對應到 Yarn 裡面的隊列,這樣就可以儘量地保證生產集群上資源的合理分配和較高的資源利用率。

對於集群擴展的支持,如何能夠線性地去增長,XXL-Job 和 Airflow 更多是在執行器上進行一個水平的擴展。而 DolphinScheduler 的架構設計是 Master 和 Worker,都是無中心化設計,所以都支持動態的伸縮。

易用性上可以看到,DolphinScheduler 強調開箱即用,具備很高的易用性。

社區活躍度來講,DolphinScheduler 和 Airflow 都是 Apache 頂級項目。

二次開發成本考慮,聯想 TDP 團隊更多的是採用 Java 語言。

綜上,我們最終選擇了 Apache DolphinScheduler 作為統一的調度中心。

2. DolphinScheduler 功能一覽

當時,聯想 TDP 團隊是基於 DolphinScheduler 2.x 版本,以下是 2.x 版本的一些功能。

(1) 2.xDAG 可視化

DAG 是一個有向無環圖,是 Dolphin 裡面一個非常重要的概念。如上圖可以看到,DAG 主要是由左側的任務組成,通過箭頭即可描述任務之間的關係。當多個任務之間有更複雜的關係時,就需要有可以描述任務間複雜依賴關係的邏輯任務。DolphinScheduler 的任務主要分為以下兩種任務類型。

① 邏輯任務:表述任務之間的依賴關係,比如 Switch 任務、子工作流任務和依賴的任務;通過這些任務可以描述任務之間、工作流之間的依賴關係;

② 物理任務:在 Worker 上具體執行的任務,比如 Shell、SQL、Spark、MR、HTTP 任務等。

(2) 工作流定義

繪製出的 DAG 保存後會生成工作流定義,可以對工作流定義進行多種操作,如,定時或手動啟動,也可以查看工作流定義的各版本信息。

(3) 工作流實例

無論是手動觸發或者定時觸發工作流定義,都會生成工作流實例。一個 DAG 裡面有多個任務,因此,一個工作流實例會產生多個任務實例。

工作流實例也可以進行各種操作,比如,對正在運行實例進行暫停或 KILL 的操作,當工作流實例變成終態時,比如是失敗的狀態,也可以對工作流實例進行失敗重跑的操作。

(4)任務實例

在任務實例的層面,可以查看任務的執行日誌,也可以對實例設定強制成功,這是一個人工干預的功能。

(5)任務狀態

任務有多種狀態。當 Master 生成任務實例的時候,任務默認為提交成功的狀態。當 Master 分發任務給 Worker,Worker 拿到後開始執行,會向 Master 匯報狀態。這個時候 Master 會把提交成功的任務狀態更改為運行中。當正在運行的工作流實例被暫停,任務就會顯示為暫停狀態。任務執行完成會顯示最終狀態,比如失敗、成功或 KILL。

當某個 Worker 節點掛掉時,任務需要容錯,這時需要容錯的狀態,由其他的 Worker 進行接管。當工作流實例被執行 KILL 操作,任務也會變成 KILL 狀態。

在 DolphinScheduler 3.0 的時候增加了在提交成功與運行中之間的一個狀態,分發狀態。

(6)2.x 新增主要 Feature

2.x 主要新增了五個特性:

  • 任務結果參數傳遞
  • 工作流間的血緣關係,工作流之間的依賴關係可以通過依賴任務或者子流程表達;
  • 增加數據同步主鍵 waterdrop、多分支等任務組件能力支持;
  • 工作流定義和任務關係拆分,更易通過 openAPI 生成工作流;
  • 將工作流定義和任務關係做了拆分,添加了工作流版本控制。

3. DolphinScheduler 架構設計

以下是 DolphinScheduler 整個架構的演變過程,以及它是如何支撐 10 萬級任務調度的。

(1)DolphinSchedule 1.2 架構

Apache DolphinScheduler 從一開始就採用了無中心化的設計,這樣可以保證調度集群性能的線性增長。

1.2 架構是 Apache DolphinScheduler 最初的架構,其中有兩個重要的組件,即 MasterServer WorkerServer。MasterServer 負責 DAG 任務的切分,將切分後的 DAG 任務生成工作流實例進行解析並提交。1.2 架構通過 zk 隊列存儲任務,Master 把任務存儲到 zk 隊列上,然後 Worker 再從 zk 隊列去取,這種方式消耗比較大,同時,Worker 與 DB 產生交互後會更改 Task 任務的狀態。因此,1.3 的架構對這兩點進行了改進。

(2)DolphinScheduler 1.3.x 架構

1.3.x 架構主要是對以上所說的兩點問題進行了改善。首先是在 Master 與 Worker 之間引入了 Netty 通信,去除 zk 隊列,大大減少了 Worker 獲取任務運行的延遲。其次是在 Master 與 Worker 之間進行多種任務分發策略,Worker 只負責執行任務,職責變得更單一。Worker 把任務所執行的結果匯報給 Master,由 Master 統一對狀態進行管理。

(3)ApacheDolphinScheduler 2.x 架構

在 2.x 架構裡面主要是對 Master 進行了一些重構,並引入了 SPI 插件化的設計,Master 可以與 API 直接進行通信。

① 2.x 新增可擴展能力

2.x 新增的可擴展能力主要是基於 SPI 插件化的設計。它不僅對任務進行了插件化,也對告警插件進行了 SPI 設計,包括註冊中心和數據源,後期還會對資源的中心進行插件化的設計。採用 SPI 可以使代碼變得更加的簡潔,可增加調度系統的可擴展的能力,也便於二次任務的改造與開發,在多個的任務插件裡面,各個任務插件的 pom 依賴互不影響。

② 2.0 重構

  • 去分布式鎖

2.0 重構大大提升了 2.0 的並發能力。雖然在 1.x 架構裡面是多個 Master 同時運行,但與 DB 進行交互的時候,會從資料庫里取得 command,但需要採用分布式鎖。2.0 去除了這個分布式鎖,當 Master 動態上下線的時候,會根據自己的分片編號計算屬於自己的槽位,然後根據槽位查詢資料庫取到屬於自己的 command。

  • Master 中線程池

在 1.x 的版本裡面,線程池的使用數量相對較多一點,在 2.0 重構主要是對 Master 中線程池進行了一些優化,引入了事件驅動的機制:

第一,MasterSchedulerService 線程去掉了分布式鎖,從 DB 中取到 command 生成工作流實例進行拆分、提交任務。

第二,在 WorkflowExecuteThread 線程里會維護一個事件的隊列,然後去不斷地處理變化,當有 Worker 發來的任務狀態變化或 Master 發來流程實例狀態的變化,WorkflowExecuteThread 線程就會進行相應處理。當 API 的界面上觸發了如 KILL 或暫停操作這樣的事件,會通過事件的機制,由 Master 統一去進行處理。

4. 社區發展

如圖是 DolphinScheduler 的社區發展指標。可以看到 DolphinScheduler 從進入孵化器,到從孵化器畢業,再到現在,社區發展的各項指標都是呈指數級的增長,是一個具備很強生命力的 Community。其中,社區的用戶數、開發者的數量、貢獻者的數量,以及 Committer PMC 都有幾何倍的增長,並且還在持續不斷地上升。這些都說明了 DolphinScheduler 是一個健康的可持續發展的社區。

基於以上原因,最終聯想選擇了 Apache DolphinScheduler,目前已接入了多個業務 Domain 到生產環境。

--

03

在聯想的落地實踐

在聯想內部,各個業務裡面有不同的需求,比如,不同的任務類型,以及任務間需要依賴、參數傳遞等。所以,在接入 DolphinScheduler 後進行一些功能的改造。

1. HTTP 任務參數傳遞

在一些非大數據的場景裡面更多是 HTTP 任務,並且任務的請求參數之間存在依賴的關係,所以就提出了 HTTP 參數傳遞的功能。當時在社區 2.x 的版本裡面只有 SQL 任務和 Shell 任務是支持參數傳遞,而 HTTP 任務不支持。

因此,對調度進行了一些改造,在 HTTP 的任務節點裡面的自定義的參數增加了 OUT 參數,也就是使任務的執行結果可以作為變量輸出,並增加欄位的解析。這樣,就可以在 API 接口返回的結果取某個欄位定義為輸出變量,下一個 HTTP 任務就可以把其作為請求參數使用。

2. Java 任務插件開發

第二個改造是 Java 任務插件的開發。在之前的一些業務部門有一些 XXL-Job 的任務,需要對這些任務進行遷移。這些作業主要是通過 Spring Bean 裡面的方法實現定時。對於 DolphinScheduler 而言,如果進行改造需要增加執行器的概念,由執行器和 Worker 進行通信,增加執行器與 Worker 之間的任務狀態的匯報,最後再由 Worker 把最終的狀態匯報給 Master。

這個實現的原理是通過提供 SDK 開發執行器,提供一個註解給業務人員添加到自己編寫的調度任務里,當程序啟動的時候會掃描註解下面所有的方法,並在運行過程中將所執行的狀態匯報給 Worker,由 Worker 匯報給 Master 進行統一管理。因此需要去抽象出一些必要的任務參數,並將其作為 Java 任務插件,從而實現在 DAG 上支持 Java 任務。

如圖,是 Java 任務插件的實現,在 DAG 上去拖拽 Java Method 任務時填上參數即可實現 Worker 對執行器的調用。

3. 項目全局參數

在 DolphinScheduler 里有很多參數的概念,方便對任務進行各種操作,比如,任務的自定義的參數、流程的全局的參數、啟動的參數、還有一些內置的參數。在操作 HTTP 任務時,在 Header 里有 token 欄位,並在多個工作流定義裡面都使用同樣的欄位,因此,就有了項目全局參數這樣的需求。

當一個普通任務運行時,任務內的參數根據參數替換優先級規則依次覆蓋,參數優先級從高到低為:

任務自定義參數 > 流程定義全局參數 > 項目全局參數

當手動啟動流程定義時,可以指定啟動參數,這時候參數替換的優先級會發生變化,啟動參數覆蓋流程定義的全局參數,優先級變為:

任務自定義參數 > 流程定義啟動參數 > 流程定義全局參數 > 項目全局參數

4. 內部認證接入

對於內部認證接入,接入了兩種內部的認證方式。

--

04

3.x 新特性 & Roadmap

1. 3.x 新特性

DolphinScheduler 在 3.x 版本裡增加了很多的新特性,以下主要對四個特性作介紹。

(1)UI 重構

在 3.x 版本中,DolphinScheduler 對整個 UI 進行了重構,使 UI 的響應速度都有了提升。

(2)數據質量校驗

數據質量校驗是在做數據處理中常用的功能。在 3.0 版本中,DolphinScheduler 內嵌了 10 個數據質量規則,都可以通過拖拉拽的方式內嵌到 DAG 里對單表或多表進行校驗,保證數據處理過程中的正確性和準確性。

(3)任務組

當調度里的任務達到了幾萬級十萬級的時候,就會出現大量並發,大量的任務在競爭資源。雖然 Dolphin 在一開始也對任務優先級進行了設計,Worker 里也有各自的線程池,但任務運行時還是可以同時搶占資源。

因此在 3.0 版本裡新增了任務組功能來控制任務實例的並發,針對每個業務設置任務組細分各自的隊列,將資源競爭下沉到各個任務組裡。

在創建任務組的時候可以根據業務區分給任務組命名,同時可以設置任務組的資源數量。創建任務時可以設置任務歸屬的任務組,當任務啟動時,Master 向 Worker 分發任務先判斷當前任務所在的任務組的資源數量是否已經被占滿,如果沒有則順利地發送,否則就處於等待狀態。

(4)支持多種工作流執行的策略

在 DolphinScheduler 之前的版本中,都默認工作流的執行策略為並行執行。當有多個定時任務或者手動提交多個任務運行時,任務之間是處於並行狀態。在 3.x 的版本裡面增加了多種工作流執行的策略的支持,可以選擇任務是並行執行或串行等待、串行拋棄、串行優先等。下面是串行等待狀態的示例:

如圖,這幾個實例同屬於一個工作流定義,如果是在之前版本中,實例是並行執行的,但在 3.x 版本裡,可以選擇設置串行等待此時就只有一個工作流實例是正在運行的,其他的實例為串行等待的狀態當正在運行的實例執行完後,實例狀態就變成串行恢復狀態,然後繼續執行。

2. 未來規劃

最後是關於聯想內部以及 Dolphinscheduler 社區未來的一些規劃。

在聯想內部主要是做了四個方向的規劃:

(1)根據內部的需求增加個性化功能。比如,對在線狀態的工作流定義加入定時的預覽,目前 Dolphinscheduler 只可以對下線狀態的工作流定義和在編輯中的工作流定義進行預覽,之後會對這裡進行調整。

(2)API 集成。聯想各個的業務團隊裡有著各自的 UI 界面使用,但都需要用到調度的一些特性和功能,比如任務編排、查看任務狀態、運行或者定時工作流定義等,這都需要 API 的集成。

(3)SDK 集成。目前 Dolphin 裡面可以通過 Python 代碼快速地生成 DAG,但在 Java SDK 裡面需要進行改造。

(4)數據質量。Dolphinscheduler 3.0 版本目前已經支持數據質量校驗規則的功能。

對於社區,個人的思考是未來的 Roadmap 更多集中在任務的觸發,任務的調度,任務的執行方面上

(1)定時觸發插件化。Dolphin 目前是使用分布式的 Quartz,這裡也可以考慮做插件化,比如說通過時間輪的設計,或者用戶自定義一個觸發邏輯。

(2)去 zk 實現高可用。目前 DolphinScheduler 服務註冊上也實現了 SPI 實現,可以使用 zk 或者 Etcd,任務隊列也已經不存在 zk 裡面了,不過如果使用 zk 作為註冊中心,在部署的時候也要部署 zk。如何在去掉 zk 的情況下也能保證註冊的一致性,是未來需要考慮的一個方向。

(3)DataOps。調度中心在整個 DataOps 裡面承擔了一個強有力的底座功能。如何使調度中心更好地去支持 DataOps 的生態?

(4)通用性功能、性能。Dolphinscheduler 本身是一個調度,回歸調度的一些核心的功能,會在性能上持續的優化,不斷地降低延時。

(5)雲原生。雲原生時代,調度也會在這個方向持續的增加支持能力。

目前 DolphinScheduler 可以支持 K8S 的部署,因為是雲原生的調度系統,自然也具備了彈性伸縮多雲的能力。

在 DEV 的版本已經支持容器的任務,目前思科已在內部實現,後期也會把功能逐步貢獻到社區。這樣可以支持 Flink 任務 pod 化,這樣在提交一些大數據作業的時候多一些選擇,可以選擇資源的調度是 Yarn 或 K8S。

最後,在 Dolphin 裡面還有很多的功能是需要貢獻的,比如數據血緣、K8S 的 Task pod 化,以及智能調度、實時任務 Flink 更好的支持等。後期也可能打破 DAG 的概念,把實時的任務單獨地作為一個 UI,因為實時任務沒有什麼中間狀態,一般只有啟動、運行,暫停和停止的操作也少。另外,包括流與批之間該如何進行串聯等問題,這些都是未來 Dolphinscheduler 的一些方向。除了代碼層面,在社區裡面也一些其他貢獻,比如對文檔進行翻譯、做測試、在 GitHub 上回答 Issue,這些都是為開源社區進行了貢獻,歡迎大家向 Apache Dolphinscheduler 進行貢獻。

Apache Dolphinscheduler 社區:https://dolphinscheduler.apache.org/en-us/community/

--

05

問答環節

Q:在雲原生這塊,DolphinScheduler 的支持能力和社區未來發展是怎麼計劃和考慮的呢?

A:Apache DolphinScheduler 目前已發布的版本是支持 K8S 部署的,在 DEV 的開發版本裡面是支持了容器的任務,支持管理若干個 K8S 集群環境,使得每一個 K8S 集群會有獨立的工作流和任務實例獨立運行,容器任務也是使用拖拉拽的方式,填上鏡像地址等任務參數然即可執行。在後期可能會對 Spark Streaming 或者 Flink 這種實時的任務、運行時間比較久的任務 pod 化,但並不是把所有的 Task 都 pod 化,因為這樣也會有頻繁的開銷。

今天的分享就到這裡,謝謝大家。


|分享嘉賓|


|DataFun新媒體矩陣|


|關於DataFun|

專注於大數據、人工智慧技術應用的分享與交流。發起於2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公眾號 DataFunTalk 累計生產原創文章900+,百萬+閱讀,16萬+精準粉絲。

關鍵字: