【OSSワークフローシステム】Airflowについてまとめる

Airflowの概要

OSSのワークフローシステムの一つ。スケジューリングやモニタリングのプラットフォームとして、データ基盤のETL処理で使われることが多い。

他のOSSワークフローシステムと比べてカスタム性が高く、情報も多いため、多くの会社で採用されている傾向にある。

主に以下の用途で使われることが多い。

  • ジョブの分散処理、依存関係処理
  • UIによる依存関係の可視化、操作が可能
  • Python(スクリプト)で処理の定義ができる
  • ログ管理が可能

メリット

  • 情報も多く、取り入れている会社も多い
  • ワークフローをコードで管理でき、機能性やカスタマイズ性も豊富
  • 簡単なコード(pythonベース)で実装可能
  • UIもわかりやすく、再集計もやりやすい
  • 将来管理が大変になって、なるべく工数削減したいとなった時には、cloud composerやMWAAにスムーズに移行することができ、工数もかからない

デメリット

  • コードを書く量は他のツールに比べて多くなるかも

DAG(有向非巡回グラフ)

DAG(有向非巡回グラフ)とは、Airflowでの依存関係を定義するときのグラフのこと。簡単にいうと、処理と順番が記述されたワークフローそのもののことを指す。

例えば、1から5までの処理(task)があったとき、以下のようなDAGをpythonで作成する。(task1⇨task2,3,4⇨task5の順で処理が実行される)

タスク実行のトリガー設定

Airflowでは、DAGの定義によって依存するタスクが完了後に、対象のタスクを実行するといったようなことができる。

ワークフローシステムを導入する最大の目的は、依存関係処理の実現だろう。

例えば、bigqueryのスケジューリングクエリを実行して処理を管理している場合、そのクエリ処理がどのくらいの時間で終わるかはっきりとはわからない。

大体の人がその処理実行から大体数分から数時間あけて、次の処理を行うようにするみたいな管理をしているのではないだろうか。

これは確実性に欠けるし、量が増えてきたら管理も大変になることは明確である。

Airflowでは定期実行(バッチ処理)が基本でありつつ、実行方法は目的に沿ってカスタマイズすることができる。例えばシステム不具合で取得できなかった日時分のデータをbackfillコマンドで取得することができるなど。

環境構築して動かしてみる

環境

  • MacOS
  • Docker 20.10.7
  • Airflow 2.2.2

どの環境でもできるようにdockerで構築をしていく。

【docker】基本的なコマンドと使い方

docker-composeを落としてくる

以下URLのREADME参照。

https://github.com/puckel/docker-airflow

 wget https://raw.githubusercontent.com/puckel/docker-airflow/master/docker-compose-CeleryExecutor.yml -P <落としたいローカルディレクトリを指定>

Docker起動

docker-composeでimageとコンテナを起動。

docker-compose -f docker-compose-CeleryExecutor.yml up -d

http://localhost:8080にアクセスするとAirflowが起動されている。

DAGを作成する

ルートディレクトリの/dags配下にDAGファイルを作成する。

test_bash_dag.py

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# DAGを定義
default_args = {
    'owner': 'root',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'schedule_interval': timedelta(days=1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAGのインスタンスを作成
dag = DAG('test_bash_dag', default_args=default_args)

# DAGに紐づくタスクを作成

task1 = BashOperator(
    task_id='task1',
    bash_command='echo task1 exec',
    dag=dag)

task2 = BashOperator(
    task_id='task2',
    bash_command='echo task2 exec',
    dag=dag)

task3 = BashOperator(
    task_id='task3',
    bash_command='echo task3 exec "{{ params.message }}"',
    params={'message': 'All Completed'},
    dag=dag)

# 依存関係を定義
[task1, task2] >> task3

shellの実行をタスクに分けてDAGを作成。

task1と2完了⇨task3の順で実行される。

dags配下にソースを保存するとAirflow画面に作成したDAGが出てくるので、トリガーをONにする。

ONにすると、DAGが実行され、処理が成功したか失敗したか等のステータスがtree形式で見れるようになっている。

各タスクをクリックすればlogを見ることもできる。

DAGの書き方

Operator

BashOperatorやPythonOperatorなどタスクを定義するためのテンプレートで、Operatorを使用することでDAGを作成することができる。

また外部システムとの連携用のOperatorもあり、sshやmysqlなどのRDBMS、AWS、GCPなど多種多様なシステムと連携することができる。

Sensor

FileSensorやPythonSensorなど「条件を満たすまで待機する」というタスクを定義する際に利用する。まさにセンサー。

特定のファイルが格納されたときや特定の処理が実行された時など、イベント駆動型にしたいときに利用できそう。

失敗処理

trigger_ruleを用いることで、先行タスクのステータスに応じて処理の実行を制御することができる。

例えば、エラーを吐いた場合は以降の処理は全てスキップすることや、timeoutした時は失敗扱いにせず、以降の処理も実行するようにするといった設定も可能。