東京 Ruby 会議 11 直前特集号

17:55 Keynote

分散ワークフローエンジン『Digdag』の実装

ワークフローエンジンは、依存関係のある複数のタスクを実行するツールです。古典的なMakefileを始め、Python製のLuigi、商用のJP1/AJS3など、様々な実装があります。Digdagは、現在活発に開発が進んでいる新しいワークフローエンジンです。このセッションでは、ツリー構造を持つワークフローの状態遷移を効率的に実行する手法、ワークフローの実行状態の永続化と分散実行、タスクスケジューラ、Dockerによるサンドボックス化など、その実装手法を紹介しながらワークフローエンジンの構成技術について解説します。

必要となる知識

SQLの基本的な知識と、Rubyのリフレクション、オプティマイザなどの知識があると、より楽しめると思います。

古橋 貞之
Treasure Data, Inc. Founder & Software Architect

分散データベース「PlazmaDB」や分散Key-Valueストア「Kumofs」など、十年にわたって分散システムの開発に携わる。また、バイナリシリアライズ形式『MessagePack』、ログコレクタ『Fluentd』、並列ETL『Embulk』などのオープンソースプロジェクトを創始した。


5/17 Skypeにて伺いました(話し手:古橋さん、聞き手:笹田)。

笹田 今日は、時間を取って下さってありがとうございます。今回は「分散ワークフローエンジン『DigDag』の実装」というタイトルですが、どういう話になるんでしょうか。

古橋 ワークフローエンジンとは何であり、なぜワークフローエンジンが必要か、それから実装上のテクニックについて話をしようと思います。

DigDag とは

笹田 「DigDag」というのは、ゲームの名前(ディグダグ)から採ったんでしょうか。

古橋 いや、必ずしもそうではなく、私はディグダグをやったことがありません。偶然近い名前になりました。DAG(Directed Acyclic Graph: 有向非巡回グラフ)を掘っていく(dig)ところから、こういう名前になりました。最初はコードネームだったんですが、響きが良いので、このまま名前になりました。

笹田 すでに、ワークフローエンジンについては既存のものが沢山あるけれど、それらでは不足していたから作り始めた、ということでしょうか。

古橋 僕たちが使いたいものにマッチしたものが無かったんですよね。

笹田 どのあたりがマッチしていなかったんでしょうか。利用するフィールドが違う、とかでしょうか。

古橋 一番大きな要求というのは、コードとしてワークフローを定義したかったというのがあります。GUI でのドラッグアンドドロップで作るのではなく、テキストで記述し、Git でバージョン管理し、コードレビューが行える、デプロイが再現可能な形で行えるという、ソフトウェア開発で行なっているベストプラクティスでワークフローを管理したかった。これができるツールが、まず少ないです。

笹田 古典的には、make があると思います。

古橋 はい。ワークフローエンジンって沢山あって、例えば、make や Airflow などがあります。ただ、プログラムが書ける、となると、完全にプログラマ向けになってしまうというのがあります。それを避けて、必ずしもプログラマじゃなくても使えるツールにしたかっというのがあります。具体的には、データ解析をする人たちが使えるツール。

笹田 GUI ではなく、テキストで記述できるが、プログラマではなくてもわかりやすい DSL を提供したかった、ということでしょうか。

古橋 その通りです。そのやり方は、僕がまさに Fluentd や Embulk で行なっていたことに似ていて、同じやり方を持ってこよう、というプロジェクトです。意外に、こういう方針のソフトウェアはないんですよね。

笹田 データ解析を目標になると、処理の分散や、分散処理も当然サポートされる、と。

古橋 はい。このプロジェクトのひとつの課題に、マルチテナントに対応しなければならない、というものがあります。というのも、ワークフローをサーバ側で実行したいからです。つまり、サーバにアカウントを作って、ローカルで動作を確認したワークフローを、Git のようにプッシュすると、サーバで定期的にそのワークフローが走るようにしたいんです。

笹田 なるほど。

古橋 今、こういうツールを作るには良い時期だと思っています。というのも、Docker など、コンテナ上で隔離された専用の環境でプログラムを実行する、というシステムがいろいろ揃ってきました。これを使うと、サーバサイドで自分の実行したいプログラムを実行することが比較的簡単に行えます。例えば、Travis CI や Circle CI も、そんな感じで作られていますが、それらはテストという用途に特化したワークフローエンジンと言えると思います。

DigDag の実装

古橋 今回、実装上難しいところが 4 点ありました。

  1. 限られたリソースの元でのフェアネスを考慮したタスクのスケジューリング(実行)
  2. タスクのステータスの管理
  3. タスクの実行(タスク間のコミュニケーションのデザイン)
  4. パラメータの評価 – YAML を使う、JavaScript を書けるように

笹田 それぞれ説明して貰ってもいいでしょうか。

古橋 はい。まず、1. はわかると思いますが、分散環境で多くの計算機リソースがある中、沢山のタスクをどうやって実行していくか決定する、というのは、一つの課題になります。しかも、それをなるべく公平に実行しないといけないわけですから。

笹田 なるほど。

古橋 次に、2. タスクのステータスの管理です。ワークフローエンジンは、タスクの依存関係を管理し、順々に実行していくことになります。あるタイミングで実行できるかどうかは、依存しているタスクが完了しているか、または失敗しているかなどの状態で決まります。この状態を管理するのが意外に難しい。マルチテナント上で動かしたいので、これを RDB で管理しています。つまり、SQL を大量に投げて、MVCC を使って、ステータス管理をトランザクショナルに更新処理を行なっています。

笹田 データベースで管理するんですね。

古橋 ステートマシンを RDB 上で実装している、と言ってもいいかもしれません。発表では、実際のクエリをお見せできると思います。

笹田 DB で作るから、という難しさがあったんでしょうか。

古橋 そもそも、この管理が難しいというのがあります。例えば、ステートがいろいろ種類があります。待機中、実行中というのはありますが、加えて、失敗とか、リトライ待ちとか、キャンセルされた、とか。そうすると、次に実行可能なタスクのリストは必ずしも自明ではありません。しかも、複数のサーバで実行しているので、これらのステータスの変更が、複数のサーバから同時にやってくるわけです。

笹田 なるほど。

古橋 3 番目が、タスクを実際に実行するところです。非エンジニアでも簡単に使えるようにしたいという目標があります。そのために、オペレータプラグインというのを提供しています。使う人は、このオペレータプラグインに、パラメータを指定して実行することになります。例えば、その一つは一般的な Ruby のスクリプトを実行できる、というものになります。

笹田 使う人は、それを使うだけで良い、ということですね。

古橋 さて、この Ruby スクリプトとの間でどうやってパラメータを受け渡すか、というのは難しい問題になります。なぜなら、メソッドを探し、Ruby を Docker の中で実行し、パラメータを渡し、エラーが起きたら例外をキャッチし、適切に上に伝達し、かつその状態を次のタスクで取れるようにする、ということが必要になります。この中身の実装を発表では紹介できればいいなと思っています。

笹田 これは、各タスクのコミュニケーションのデザインということですかね。

古橋 そうです。例えば、最初のタスクは Ruby で書いたものだけど、次のタスクは他の人が Python で書いた、みたいなものにも対応しないといけません。

笹田 なるほど。では 4 番目はどうでしょう。

古橋 パラメータをどうやって評価するか、という問題になります。単純なワークフローは、単に実行するだけですが、例えばタスクの実行結果によって、次に動かすタスクを変えたい、とか、失敗したらどうする、といった分岐が入ってくることになります。

笹田 使いたいですね。

古橋 つまり、前のタスクのパラメータを使って次のタスクの実行を変えたい、と。それから、外から与えらたパラメータがあったりとかします。それをどうやってワークフローで使えるようにするか、というのが結構難しい問題になります。なぜなら、ここは一番ユーザが利用するところだからです。さらに、いろんなユーザがいるので、いろんな要求がある。そして、あまり複雑にすると、非エンジニアが使えなくなってしまう。

笹田 難しいところですね。

古橋 そこで、DigDag では、YAML を使うようにしています。YAML では、当然パラメータなどを指定出来ます。さらに、その中に JavaScript を書けるようにしています。

笹田 JavaScript in YAML。

古橋 その JavaScript を評価して、外から変数の値を入れらるようになっていて、その結果をパラメータとしてオペレータに渡せるようになっています。

笹田 つまり、DSL のデザインとして、YAML とその中の JavaScript を提供しており、単純な場合は YAML だけ書いておけばよく、何か凝ったことがしたければ、JavaScript でロジックが記述できるよ、ということでしょうか。

古橋 その通りです。

笹田 技術的困難だった 4 点について、1、2 がエンジニアリングの話で、3、4 がデザインの話、ということになるでしょうか。

古橋 そうなると思います。

笹田 大きなシステムだから、沢山の話が出てきて本当に面白そうですね。

そのほか

古橋 例えば、笹田さんがワークフローエンジンで解決したい問題ってありますか?

笹田 直接解決したい問題じゃないんですが、分散システム上にセットアップするのが簡単だといいな、と思います。

古橋 そういう要求は凄い一般的です。ある会社で、機械学習を作ったけれど、それが R で書かれていて、分散実行できない状況だけど、データ量が多いから当然分散処理したい。そこで、凄い単純なシステムを作って便利だった、という話がありました。入ってきたデータを分割して、各サーバに分散し、GitHub においてあるスクリプトで処理して結果を返すだけのシステム。DigDag は、例えば、そういうのも簡単にできるようにしようと思っています。

笹田 良さそう。

古橋 そのために、インストールを凄い簡単にしたいと思っています。僕が作るツールは、基本的に 1 コマンドでインストール出来るようにしています。タスクを実行するエージェントは、プログラムなどデータを取ってきて、それを評価する、というようにしようと思っています。

笹田 そのエージェントを動かすマシン上で、1 コマンド実行するということになりますか。

古橋 はい。そのマシン上で、サーバのアドレスを指定してコマンドを実行すると、サーバから実行に必要なバイナリなどをとってきて、すぐに実行できるようになります。

笹田 それは便利そうですね。

古橋 まだ、全部はできていないですけどね。

笹田 タスクの状況を、絵で綺麗に見れたりするんでしょうか。

古橋 そこは完全に TODO ですね。今のところ、コマンドラインだけです。リトライの指示や、ワークフローの再定義なども、現状はコマンド操作で行なうことになります。

笹田 なるほど、楽しみです。

古橋 これで、Treasure Data を使う人が増えてくれるといいですね。

笹田 なるほど、Treasure Data 上では、すでに環境が整っているんですもんね。

古橋 一個重要なコンセプトは、ローカルで定義したワークフローを、そのままリモートで実行できるというところです。手元で開発して、それをプッシュすれば、サーバ上でちゃんと動く。ちゃんと、オペレータを使うとか、スクリプトはコンテナで使うといった、標準的な方法を使っていれば、問題無くできると思っています。他のツールでは、必ずしもそうなっていないことが多いですよね。サーバにいろいろセットアップしないといけなくて、再現性のない環境に依存することになってしまう。DigDag では、完全に再現性のあるワークフローを作りたいと思っています。データサイエンティスト界隈でも、それが問題になっていて、論文の結果が再現できない、といったことが言われているそうです。

笹田 大事な特長ですね。

古橋 今回、できるだけ質疑応答の時間を取りたいと思っています。というのも、ワークフローエンジンへの要求を聞いてみたい、というのがあります。

笹田 いろいろ聞けるといいですね。あと、すぐ後に懇親会があるので、そこで議論をしてもらうこともできると思います。では、発表楽しみにしています。今日はお忙しいところ、ありがとうございました。