當前位置:
首頁 > 知識 > Airflow:Python 工作流管理利器

Airflow:Python 工作流管理利器

Apache Airflow 是一個用於編排複雜計算工作流和數據處理流水線的開源工具。 如果您發現自己運行的是執行時間超長的 cron 腳本任務,或者是大數據的批處理任務,Airflow 可能是能幫助您解決目前困境的神器。本文將為那些想要尋找新的工具或者說不知道有這款工具的同學了解 Airflow 編寫工作線提供入門教程。

Airflow 工作流設計稱為有向非循環圖(DAG)。這意味著,在編寫工作流時,您應該考慮如何將你的任務劃分為多個可獨立執行的任務,然後,將這些任務合并為一個邏輯整體,將其組合成一個圖,從而實現我們的工作流結果,例如:

圖的形狀決定了你工作流的整體邏輯。 Airflow DAG 可以包括多個分支,您可以決定在工作流執行時要走哪些分支或者說跳過哪一個分支。這就為我們的工作創建了一個非常有彈性的設計,因為如果發生錯誤,每個任務可以重試多次,甚至可以完全停止,並通過重新啟動最後一個未完成的任務來恢復運行的工作流程。

在設計 Airflow 操作節點時,務必要記住,它們可能被執行不止一次。 每個任務應該是 冪等的,即具有多次應用的能力,而不會產生意想不到的後果。

Airflow 術語

以下是設計 Airflow 工作流程時使用的一些術語的簡要概述:

AirflowDAGs是很多Tasks的組合.

每個 Task 都被實現為一個

當一個 DAG 啟動的時候,Airflow 都將在資料庫中創建一個 記錄

當一個任務執行的時候,實際上是創建了一個 Task 實例運行,它運行在 DagRun 的上下文中。

是 Airflow 尋找 DAG 和插件的基準目錄。

環境準備

Airflow 是使用 Python 語言編寫的,這讓我們可以非常簡單得在機器上安裝。我這裡使用的是 Python3.5 版本的 Python,還在使用 Python2 的兄弟們,趕緊出坑吧,3 會讓你對 Python 更加痴迷的。雖然 Airflow 是支持 Python2 版本的,好像最低可以支持到 Python2.6,但是我牆裂推薦大家使用 Python3.接下來,我將使用 virtualenv 來管理開發環境,並且進行後續的一系列實驗。

安裝 Airflow

為了方便,我這裡單獨創建了一個 airflow 的用戶用於實驗,同時使用這個用戶的 home 目錄 作為 airflow 的工作目錄,如果你希望和我看到一樣的效果,那麼我希望是跟著我的步驟一起來:

這裡只是進入 virtualenv 環境,接下來才是安裝 airflow 的步驟,截止到我寫博客的時候,airflow 的最新版本是 1.8,所以我這裡就使用 1.8 的版本:

經過一段稍長的等待時間之後,我們的 airflow 應該是安裝成功了,在安裝過程我們可以看到,airflow 依賴於大量的其他庫,這個我們後續都會慢慢道來。現在是是否配置 airflow 的環境了。

第一個需要配置的就是 環境變數,這個是 airflow 工作的基礎,後續的 DAG 和 Plugin 都將以此為基礎展開,因為他們都是以 作為根目錄進行查找。根據我們之前的描述,我們的 HOME 目錄應該是 ,所以可以這麼設置:

哈哈,到這裡我們可以說一個最簡單的配置就算是完成了,來看點有用的吧,嘗試一下輸入 命令看看:

如果你看到了上面一般的輸出,那麼說明我們的 airflow 是安裝和配置成功的,同時,我們使用 命令看看,應該在 目錄上能夠發現以下兩個文件:

打開 文件你會發現裡面有很多配置項,而且大部分都是有默認值的,這個文件就是 airflow 的配置文件,在以後的詳解中我們會接觸到很多需要修改的配置,目前就以默認的配置來進行試驗。如果你現在就迫不及待得想自己修改著玩玩,那麼 Airflow Configure 這篇文檔可以幫助你了解各個配置項的含義。

初始化 Airflow 資料庫

可能你會有點震驚了,為啥要初始化資料庫?是的,因為 airflow 需要維護 DAG 內部的狀態,需要保存任務執行的歷史信息,這些都是放在資料庫裡面的,也就是說我們需要先在資料庫中創建表,但是,因為使用的是 Python,我們不需要自己使用原始的 SQL 來創建,airflow 為我們提供了方便的命令行,只需要簡單得執行:

這裡值得注意的是,默認的配置使用的是 SQLite,所以初始化知道會在本地出現一個 的資料庫文件,如果你想要使用其他資料庫(例如 MySQL),都是可以的,只需要修改一下配置,我們後續會講到:

Airflow Web 界面

Airflow 提供了多種交互方式,主要使用到的有兩種,分別是:命令行 和 Web UI。Airflow 的 Web UI 是通過 Flask 編寫的,要啟動起來也是很簡單,直接在AIRFLOW_HOME目錄運行這條命令:

然後你就可以通過瀏覽器看到效果了,默認的訪問埠是:8080,所以打開瀏覽器,訪問以下 URL:http://localhost:8080/admin,神奇的事情就這麼發生了,你將看到類似這樣的頁面:

第一個 DAG

從一開始就說了, Airflow 的兩個重大功能就是 DAG 和 Plugin,但是直到現在我們才開始講解 DAG。DAG 是離散數學中的一個概念,全稱我們稱之為:有向非循環圖(directed acyclic graphs)。圖的概念是由節點組成的,有向的意思就是說節點之間是有方向的,轉成工業術語我們可以說節點之間有依賴關係;非循環的意思就是說節點直接的依賴關係只能是單向的,不能出現 A 依賴於 B,B 依賴於 C,然後 C 又反過來依賴於 A 這樣的循環依賴關係。

那麼在 Airflow 中,圖的每個節點都是一個任務,可以是一條命令行(BashOperator),可以是一段 Python 腳本(PythonOperator)等等,然後這些節點根據依賴關係構成了一條流程,一個圖,稱為一個 DAG,每個 Dag 都是唯一的 DagId。

創建一個 DAG 也是很簡單得,首先需要在 AIRFLOW_HOME 目錄下創建一個 目錄,airflow 將會在這個目錄下去查找 DAG,所以,這裡我們先創建一個,創建完之後新建一個 文件:

然後,再來看下我們的 DAG 文件是怎麼寫的:

我們可以從 Web UI 上看到這個 DAG 的依賴情況:

這就定義了幾個任務節點,然後組成了一個 DAG,同時也可以發現,依賴關係是通過 來實現的,這只是一種方式,在後面我們將會看到一個更加簡便的方式。

讓 DAG 跑起來

為了讓 DAG 能夠運行,我們需要觸發 DAG 任務,這裡有幾種觸發的方式,但是,最天然的當屬定時器了,例如,在我們上面的任務中,可以發現設置了一個參數: ,也就是任務觸發的周期。但是,你光設置了周期是沒有用的,我們還需要有個調度器讓他調度起來,所以需要運行調度器:

我這裡使用的是LocalExecutor, Airflow 目前有三種執行器,分別是:

SequentialExecutor:順序得指定 DAG

LocalExecutor:使用本地進程執行 DAG

CeleryExecutor:使用 Celery 執行 DAG

其中第一種SequentialExecutor可以用來在開發調試階段使用,千萬不要在生成環境中使用。第二種和第三種可以用於生產也可以用於開發測試,但是,對於任務較多的,推薦使用第三種:CeleryExecutor

總結

本文從 Airflow 的環境安裝出發,簡單得介紹了一下如何使用 Airflow,但是本文的定位始終是一篇入門文章,對於 Airflow 的高級特性,在本博客中將會有大量的後續文章進行介紹,請大家自行搜索了解。

Reference

Airflow Tutorial

A Summer Intern s Journey into Airflow @ Agari

Get started developing workflows with Apache Airflow

題圖:pexels,CC0 授權。

點擊展開全文

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python 的精彩文章:

如何用Python做情感分析?
深入了解 Python 字元串對象的實現
Python 協程:概念及其用法
一步步學會Python高級編程
數據清洗要了命?這有一份手把手Python攻略

TAG:Python |

您可能感興趣

Windows 上的 SSH?使用 PowerShell Remoting 遠程管理 Windows 伺服器
TelephonyManager(電話管理器)
管理vRealize Automation的vPostgres資料庫
監控管理之Spring Boot Admin使用
在docker for win中使用portainer管理容器
MongoDB 管理工具: Rockmongo
Veritas收購雲數據管理公司fluid Operations AG
管理非凡的心智 Managing Brilliant Minds
Spring Boot 基礎教程 ( 三 ) :使用 Cloud Studio 在線編寫、管理 Spring Boot 應用
AudioManager(音頻管理器)
Mozilla推出兼容iOS的Face ID密碼管理器Firefox Lockbox
Kube:在VS Code中管理Helm charts
linux 使用supervisor管理開機啟動uwsgi
Oracle應用管理云:IMCS添加不同版本Weblogic entity的區別
使用Jira software+Structure實現大規模跨團隊項目管理
Kobe Bryant 歡迎 LeBron James 加入「Lakers 大家庭」並盛讚管理層
集群管理工具KafkaAdminClient——原理與示例
如何在Kubernetes中管理和操作Kafka集群
Apple 機器學習與 AI 負責人 John Giannandrea 晉陞最高管理團隊
GPaste:Gnome Shell 中優秀的剪貼板管理器