
Digdagとは
TreasureData社がオープンソースで公開しているツールで、依存関係のある複数のタスクを実行するワークフローエンジンです。
プログラマではなくてもわかりやすいよう、YAMLに対し、DSLにて、DAGの構造で、ワークフローを定義することができます。
ワークフローエンジンに必要な、以下の機能が備わっています。
- タスクの依存関係
- スケジューリング
- エラー処理
- リトライ
- メール通知
- タスクの並列実行
- タスク実行ログの収集
Github: https://github.com/treasure-data/digdag/
Document: http://docs.digdag.io/
導入方法
インストールからジョブを設定するところまでをみていきます。
1.動作環境
動作環境は下記になります。
- Azureu上仮想マシンでCentOS 6.8
- jdk-8u112-linux-x64.rpm
- Digdag 0.8.22
2.インストールと起動
Digdagをインストールします。
[code lang=bash]
$ curl -o ~/bin/digdag –create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
[/code]
次に、Digdagをサーバーとして起動できるよう、「~/.config/digdag/」直下に、以下の設定ファイルを2つ作成します。
「digdag.properties」の作成例
[code lang=text]
database.type=h2
database.path=/home/cent/bin/test.db
digdag.secret-access-policy-file=/home/cent/.config/digdag/secret-access-policy.yaml
#「http://docs.digdag.io/command_reference.html#secret-encryption-key」と同じ設定を指定
digdag.secret-encryption-key=MDEyMzQ1Njc4OTAxMjM0NQ==
[/code]
「secret-access-policy.yaml」の作成
以下のリンク先と同じ設定を指定する
それから、作成後に以下のコマンドでDigdagのサーバを起動します。
[code lang=bash]
cd ~/bin/
digdag server -c ~/.config/digdag/digdag.properties -L ~/bin/logs/server.log -l debug -O ~/bin/logs -A ~/bin/logs &
[/code]
3.ワークフローの作成
TreasureDataにある2つのテーブルにて、一方のテーブルに、他方のテーブルからクエリ実行で取得したレコードを挿入するワークフローを作成します。
ワークフローには、以下のタスクを定義します。
- レコード挿入前に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
- 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する
- TreasureDataに保存してあるジョブクエリを実行する(ジョブクエリは、一方のテーブルに、他方のテーブルにクエリを実行して取得したレコードを挿入するクエリ)
- レコード挿入後に、TreasureDataにあるテーブルのカウントを取得するジョブクエリを発行する
- 前に取得したカウントを、メール本文を定義したテンプレートを読込み、メール送信する
ワークフロー実装例「spike.dig」
[code lang=text]
timezone: Asia/Tokyo
schedule:
cron>: 30 14 7 12 *
_export:
td:
database: spikedb
mail:
port: 465
from: ""
to: [""]
debug: true
_error:
mail>: ./tmpl/err.txt
subject: 【digdag mail test】err mail
+step1:
td>: ./sql/step1.sql
store_last_results: true
engine: hive
+step1-1:
mail>: ./tmpl/info_step1.txt
subject: 【digdag mail test】step1 info mail
+step2:
td_run>: spikedb_spiketbl002_select _all
+step3:
td>: ./sql/step3.sql
store_last_results: true
engine: hive
+step3-1:
mail>: ./tmpl/info_step3.txt
subject: 【digdag mail test】step3 info mail
[/code]
クエリ作成例「step1.sql」
[code lang=sql]
select count(*) AS cnt1 from spiketbl001
[/code]
メール本文テンプレート作成例「info_step1.txt」
[code lang=text]
${moment().format("YYYYMMDD HH:mm:ss")}
———–
information
———–
step1 sql count=${td.last_results.cnt1}
[/code]
なお、今回サンプルとして作成するワークフローのDigdagプロジェクト「spike」は、「~/bin/」直下に作成し、フォルダツリーは以下のようにします。
[code lang=bash]
$ tree spike
spike/
├─spike.dig
│
├─sql
│ ├── step1.sql
│ └── step3.sql
│
└─tmpl
├── err.txt
├── info_step1.txt
└── info_step3.txt
[/code]
次に、以下のコマンドにて、サーバに今回作成したワークフローを登録します。コマンドは、カレントディレクトリをプロジェクトフォルダに移動してから実行します。
[code lang=bash]
cd ~/bin/spike
digdag push spike
[/code]
さらに、以下のコマンドにて、サーバに今回作成したワークフロー中で使用される「secrets」パラメータの設定を登録します。パラメータの設定は、json形式でファイルに定義できるため、作成した定義ファイル「secrets.json」をコマンド引数に指定しています。
[code lang=bash]
cd ~/bin/
digdag secrets –project spike –set @secrets.json
[/code]
スケジュールの登録状況は、以下のコマンドで確認できます。
[code lang=bash]
cd ~/bin/spike
digdag schedules
[/code]
当該ワークフローの実行状況は、以下のコマンドで確認できます。
[code lang=bash]
digdag sessions spike
[/code]
まとめ
Digdagは、以下の特徴があります。
- DAG形式のワークフローが作成できる
- フロー制御や外部のスクリプト呼出しができプログラマブルなワークフローにできる
- Treasure Dataの機能が使える
- ワークフローで使用する環境変数などのパラメータを埋込みではなく外出しにできる
- ワークフローの実行時間超過・失敗時のエラー通知ができる
- タスクの並列実行機能がある
データ分析基盤の新規構築や見直しの際に、本記事のような、OSSのワークフロー管理ツールの導入し、
ETL処理、DBの定期更新や定型レポートの作成など、一連の定型処理を自動化すれば、
リードタイム・コストダウンの削減につながります。


コメント投稿