Apache Airflow的完整介紹

聞數起舞 發佈 2020-06-04T21:59:39+00:00

關鍵概念,安裝和DAG實際示例> Photo by Şahin Yeşilyaprak on Unsplash Airflow是用於自動化和安排任務和工作流程的工具。

關鍵概念,安裝和DAG實際示例

Airflow是用於自動化和安排任務和工作流程的工具。 如果您想以數據科學家,數據分析師或數據工程師的身份高效工作,那麼擁有一個可以自動執行要定期重複的流程的工具至關重要。 從提取,轉換和加載常規分析報告的數據到自動重新訓練機器學習模型,這可以是任何事情。

Airflow使您可以輕鬆地自動化主要由Python和SQL編寫的簡單到複雜的流程,並具有豐富的Web UI來可視化,監視和修復可能出現的任何問題。

下面的文章是對該工具的完整介紹。 我已經包含了從虛擬環境中安裝到以簡單易懂的步驟運行第一個dag的所有內容。

我將本教程分為6個部分,以使其易於理解,以便您可以跳過已經熟悉的部分。 包括以下步驟:

· 基本氣流概念。

· 如何在虛擬環境中設置Airflow安裝。

· 運行Airflow Web UI和調度程序。

· 常見的CLI命令列表。

· Web UI導覽。

· 創建一個真實的示例DAG。

1.基本概念

在討論Airflow的安裝和使用之前,我將簡要介紹該工具至關重要的幾個概念。

DAG

該工具的核心是DAG(有向無環圖)的概念。 DAG是您要在工作流中運行的一系列任務。 這可能包括通過SQL查詢提取數據,使用Python執行一些計算,然後將轉換後的數據加載到新表中。 在Airflow中,每個步驟都將作為DAG中的單獨任務編寫。

通過Airflow,您還可以指定任務之間的關係,任何依賴關係(例如,在運行任務之前已將數據加載到表中)以及應按順序運行任務。

DAG用Python編寫,並另存為.py文件。 該工具廣泛使用DAG_ID來協調DAG的運行。

DAG運行

我們具體說明了DAG應何時通過execute_date自動運行。 DAG按照指定的時間表運行(由CRON表達式定義),該時間表可以是每天,每周,每分鐘或幾乎任何其他時間間隔

Operator

Operator將要在每個任務中執行的操作封裝在DAG中。 Airflow有大量的內置操作員,它們可以執行特定任務,其中一些特定於平台。 此外,可以創建自己的自定義運算符。

2.安裝

我將為您提供在隔離的Pipenv環境中進行Airflow設置的個人設置。 如果您使用其他虛擬環境工具,則步驟可能會有所不同。 此設置的大部分靈感來自於出色的Stackoverflow線程。

對您的Airflow項目使用版本控制是一個好主意,因此第一步是在Github上創建一個存儲庫。 我叫我airflow_sandbox。 使用git clone" git web url"創建到本地環境的存儲庫克隆後。

從終端導航到目錄,例如 cd / path / to / my_airflow_directory。

進入正確的目錄後,我們將安裝pipenv環境以及特定版本的Python,Airflow本身和Flask,這是運行Airflow所必需的依賴項。 為了使一切正常工作,最好為所有安裝指定特定版本。

pipenv install --python=3.7 Flask==1.0.3 apache-airflow==1.10.3

氣流需要在本地系統上運行一個稱為AIRFLOW_HOME的位置。 如果未指定,則默認為您的路線目錄。 我更喜歡通過在.env文件中指定它來在我正在工作的項目目錄的路徑中設置Airflow。 為此,只需運行以下命令。

echo "AIRFLOW_HOME=${PWD}/airflow" >> .env

接下來,我們初始化pipenv環境。

pipenv shell

氣流需要運行資料庫後端。 默認設置為此使用SQLite資料庫,這對於學習和實驗來說是很好的選擇。 如果您想建立自己的資料庫後端,那麼氣流文檔會提供很好的指導。 初始化資料庫類型。

airflow initdb

最後,我們創建一個目錄來存儲我們的dag。

mkdir -p ${AIRFLOW_HOME}/dags/

就是說初始基本設置完成了。 您現在應該具有一個如下所示的項目結構。

3.運行Airflow

Airflow具有出色的Web UI,您可以在其中查看和監視您的問題。 要啟動Web伺服器以查看UI,只需運行以下CLI命令。 默認情況下,Airflow將使用埠8080,因為我已經使用該埠運行我指定8081的其他埠。

airflow webserver -p 8081

我們還需要啟動調度程序。

airflow scheduler

現在,如果我們導航到http:// localhost:8081 / admin /?showPaused = True。 我們將看到以下螢幕。

Airflow有一些顯示在用戶介面中的示例dag。 一旦開始創建自己的窗口,您可以通過單擊螢幕底部的"隱藏暫停的DAG"來隱藏它們。

4.基本的CLI命令

讓我們使用這些示例dag來瀏覽一些常見的Airflow CLI命令。

讓我們從教程dag中運行sleep任務。

airflow run tutorial sleep 2020-05-31

我們可以在教程DAG中列出任務。

bash-3.2$ airflow list_tasks tutorial

暫停此DAG。

airflow pause tutorial

取消暫停教程。

airflow unpause tutorial

回填(在過去的日期執行任務)。 指定dag_id(教程),開始日期(-s)和結束日期(-e)。

airflow backfill tutorial -s 2020-05-28 -e 2020-05-30

有關CLI命令的完整列表,請參見文檔中的此頁面。

5. Web UI

我們可以從Web UI監視,檢查和運行任務。 如果返回到Web伺服器,我們可以看到我們在教程DAG上運行的CLI命令的效果。 為了便於查看,我隱藏了暫停的dag。

我們可以通過多種方法來檢查DAGS的運行情況。

如果我們選擇樹狀視圖。

我們可以輕鬆查看哪些任務已運行,正在運行或已失敗。

我們還可以通過單擊小方塊從此處運行,清除或標記特定任務。

如果單擊"渲染"按鈕,我們可以查看已運行的代碼或命令。

通過"代碼"視圖,我們可以查看組成DAG的代碼。

Graph View是一種很好的方式可視化任務的排序或相關方式。

Web UI中的另一個重要區域是Admin。 在這裡,您可以定義與其他平台(如資料庫)的連接,並定義可重用的變量。

6.第一個DAG

我將在此處嘗試提供一個接近實際的DAG示例,以說明至少一種使用Airflow的方法,並介紹隨之而來的一些複雜性。

我將編寫一個Airflow DAG,它首先檢查BigQuery公共數據集中感興趣日期的數據是否存在,然後按日程將數據加載到我自己的私有項目中的表中。

BigQuery有一個免費的使用層,該層允許您每月查詢1TB數據,因此,如果您想自己嘗試一下,則可以以零成本進行。

BigQuery設置

為了同時使用Airflow和BigQuery,我們需要首先完成一些附加的設置步驟。

為了能夠通過Airflow查詢和加載BigQuery中的數據,我們需要首先授予Airflow所需的安全權限。

為此,您需要在Google Cloud Platform上創建一個服務帳戶。 這有點像創建一個有權訪問您的帳戶的用戶,但其目的是允許其他平台訪問。

首先,從Google Cloud Console導航到服務帳戶。 然後單擊創建服務帳戶按鈕。

接下來填寫顯示的表格。

在下一頁上,您需要選擇要授予的訪問級別。 我已選擇所有資源的編輯器,

關鍵字: