Stimulator

機械学習とか好きな技術話とかエンジニア的な話とかを書く

機械学習パイプライン構築を楽にするgokart-pipelinerを作った

- はじめに -

luigi、gokartで作ったtaskのパイプライン構築をちょっと楽にする(かもしれない)管理するためのツールを作った。

github.com

近年、MLOpsの一部である機械学習のためのパイプラインを構築するためのツールは飽和状態にあるけどそれらと比較してどうなのという話も書く。

gokart-pipelinerを使ってみる

gokartはエムスリー株式会社が開発している機械学習パイプラインOSSである。gokart自体、使った事がないし興味もないという人も居るかもしれないが、一度以下に提示するgokart-pipelinerの例を見てほしい。

# pip install gokart_pipeliner
import luigi
import gokart
from gokart_pipeliner import GokartPipeliner

# taskを定義する
class TaskA(gokart.TaskOnKart):
    def run(self):
        self.dump(['foo'])

class TaskB(gokart.TaskOnKart):
    before_task = gokart.TaskInstanceParameter()
    text = luigi.Parameter()

    def run(self):
        x = self.load('before_task')
        self.dump(x + [self.text])

# パラメータとパイプラインを書く
params = {'TaskB': {'text': 'bar'}}
pipeline = [TaskA, TaskB]

# run
gp = GokartPipeliner()
gp.run(pipeline, params)

luigiやgokartでは、1つのタスクを1classで表現する。TaskAは ["foo"] を保存するだけのタスク。TaskBはparameterに設定したbefore_taskを読み込んで、同じくparameterに設定した text を末尾に追加するタスク。GokartPipelinerがその2つのタスクをよしなに接続し、パラメータと共に実行している。

gokartの良さ

手前味噌にgokartの宣伝をするならば、この時以下のようなメリットがある

  • それぞれのTaskの以下データが別々にハッシュ値付きでpklファイルに保存される
    • self.dumpしたデータ
    • importした全てのmoduleのversion
    • Taskの処理時間
    • Task内で使われた全てのrandom_seed
    • 出力されるログ
    • Taskのクラス変数として設定された全てのparameterの値
  • TaskBのparameterを変えて実行した時は別のハッシュ値で上記ファイルが生成される
  • TaskAに新たにparameterを追加した場合はTaskBのハッシュ値も変わり依存関係を考慮して両方rerunされる
  • TaskA、TaskB間は上記の出力を中間ファイルとしてやり取りされるためメモリに優しい
  • dumpの出力ファイル形式を拡張できる(デフォルトでもcsv、zip、feather、png、…などをサポート)
  • 入出力時のpandas.DataFrameの型、columnチェック機能がある
  • 保存ファイルのディレクトリ構成がPythonスクリプトの構成から自動的に決まる
  • 基本的なnumpy、randomのシードは自動で固定化される
  • SOLID原則をなるべく守りながらコーディングできる
  • 保存された出力データとparameter、hashの管理はthunderboltなる別ライブラリでPython上で行える
  • 並列にタスクが動作してもredis経由でTaskがロックされる

機械学習モデリングにおいて、何をロードして、何を出力し、どのように繋げるか以外の殆どを自動的に決定し保存する仕組みになっている。また、その上でクラス単位でTaskを作る事によるソフトウェア開発における単一責任の原則などを守りやすくなっている。

近年は、機械学習パイプラインツールの戦国時代でもある。他の多種多様なライブラリと比較しても、モデリング時やproductionの再現性のための中間出力が多いし、Pythonコード上で見た時、デコレータや謎のメソッドがチェーンされまくったコードよりは保守性が高くなるはずだ(gokartを学ぶコストさえ払えば)。

例えば「pandas.DataFrameのtext columnから文字列の長さのcolumnを生成する」という処理は、無闇に大きな関数やスクリプトにせず、1つのファイルに1つのTaskとして考えて以下のように書いていく。

class CalcTextLengthTask(gokart.TaskOnKart):
    target = gokart.TaskInstanceParameter()
    __version = luigi.IntParameter(default=1)

    def run(self):
        df = self.load_data_frame('target', required_columns={'id', 'text'}, drop_columns=True)
        df['text_length'] = df['text'].str.len()
        self.dump(df[['id', 'text_length']])

gokartにおいては、1Taskの規模感さえ一致すれば、この書き方以外でコードを書くのは難しく、コードレビューや保守が行いやすい。また、このCalcTextLengthTaskの入力となるtarget taskを変える事で、hash値等のメリットを享受しながら使い回す事ができる。例えば機械学習モデルの汎用的なTrainタスクを書いておいてTaskInstanceParameterのみ変えるといった具合に。

また、parameterで中間ファイルのhash値が変わる事を利用して、__versionのようなパラメータを雑に付けてあげれば、「長さを測る前にstripしてからという処理に変更」した時にversion=2としてcommitしておくことで、あとからgitのlogをblameしたり、出力されるhash値付きのpklファイルを見比べる事でデバッグが行いやすくなる。


加えて、wrapしているluigiとの比較は以下のスライドを参考に。
gokartの運用と課題について - Speaker Deck
何もないluigiを書くよりも書きやすいと思えるはずである。

 

gokart-pipelinerの意義

gokartでモデリングしたり、コンペに出たり、会社での本番運用を重ねていく中でいくつか課題になってきた以下のような点を解決しようとしたのがgokart-pipelinerである。

  • パラメータとパイプラインが密結合しすぎ
  • パイプラインライブラリなのにやればやるほどrequiresメソッドが複雑になる
  • jupyter notebookと行き来するのがダルい
パラメータとパイプラインが密結合しすぎ

パラメータとTaskの動作が分離しているパイプラインは、近年の流行となりつつある。

特にFacebook社の公開したHydraはかなり大きな転機だったように感じる。yamlとデコレータを軸としたパラメータ管理で、yamlファイルさえ管理していればどんなパイプラインを書いても良いし、かなり管理も楽である。

一方でデコレータは闇魔法を生みやすいし、デコレートした謎の巨大な関数を見るのはツライので、もう少しコーディングに制約を持たせた形のパイプラインを作りたいと思っていた。(他人の書いたHydra+mlflowのコード見るのしんどすぎない?)

luigiにもconfigParserを使ったiniやyamlファイルを読んでパラメータとする機能はある。しかし、gokartにもTaskInstanceParameterというやつが居る。これ自体はtaskを依存関係の一部と捉えられる良い機構ではあるものの、Parameterという扱いとしてyamlのように一箇所で管理できないネックがあった。

gokart-pipelinerの場合を見てみる。

from gokart_pipeliner import GokartPipeliner
from ExampleTasks import *

pipeline = [TaskA, {'task_b': TaskB, 'task_c': TaskC}, TaskD]
params = {'TaskA': {'param1':0.1, 'param2': 'sample'}, 'TaskD': {'param1': 'foo'}}

gp = GokartPipeliner()
gp.run(predict, params=params)

pipelineは「Task同士のTaskInstanceParameterによる依存関係のみい」を表し、paramsは「各タスクのそれぞれのluigi.Parameter」を表していて、切り分けられている。元々のluigiのconfig形式にも対応しているので、configファイル、pipeline、paramsをそれぞれ考えつつ、この構成だけ保存しておけば一元管理もできる。

パイプラインライブラリなのにやればやるほどrequiresメソッドが複雑になる

gokartの機能としてrequiresというクラスメソッドがある。これは、読み込むデータを指定するメソッドで、requiresが返す値でluigiが依存タスクを決めている。

先程のタスクをgokart-pipelinerを考えず書いた場合は以下のようになる

class CalcTextLengthTask(gokart.TaskOnKart):
    target = gokart.TaskInstanceParameter()

    def requires(self):
        return self.target

    def run(self):
        df = self.load_data_frame('target', required_columns={'id', 'text'}, drop_columns=True)
        df['text_length'] = df['text'].str.len()
        self.dump(df[['id', 'text_length']])

このrequiresは以下のようにlistやdictを返したりもできる。

    def requires(self):
        return {'target': self.target, 'model': self.clone(MakeModelTask)}

更に複雑に、依存関係やパラメータ、分岐を書く事もできる。

    def requires(self):
        data = TrainTestSplit(data=MakeData(path='/'), split)
        if self.parameter_a == 'var':
            task = MakeModelTask(data=data, param_a=0.1, param_b='foo')
        else:
            task= MakeModelTask(data=self.data)
        return {
            'data': data,
            'model': self.clone(task)}

この状態では、コンペのような実験とコーディングを繰り返す時に、依存関係がどうなってるか把握するのがどんどんしんどくなる。gokartに依存関係Treeを出力する機能があるが、流石にしんどい。こういった複雑なrequiresを集約するエンドポイントになるTaskを作ったりするが、次はそのエンドポイントからしか全体が実行できなくなったりしていくし、エンドポイントが増えれば増えるほど、どのファイルを見て回ればいいか分からなくなる。その上で途中にTaskを挿入したいとなったら、と考えるとただただ辛くなる。

なので、そもそもrequiresを書かない制約を付ければ良い。


例えば、gokart-pipelineでの先程の例を考える。

pipeline = [TaskA, {'task_b': TaskB, 'task_c': TaskC}, TaskD]
params = {'TaskA': {'param1':0.1, 'param2': 'sample'}, 'TaskD': {'param1': 'foo'}}

ここでTaskDは、リストの1つ前のdictを引数にするように、以下のように書いたクラスである。

class TaskD(gokart.TaskOnKart):
    task_b = gokart.TaskInstanceParameter()
    task_c = gokart.TaskInstanceParameter()
    param1 = luigi.Parameter()

    def run(self):
        b = self.load('task_b')    # list
        c = self.load('task_c')    # list
        data = b + c + [self.param1]
        self.dump(data)

requiresメソッドは、TaskInstanceParameterのパラメータ名から、gokart-pipelineが生成する。この制約によって複雑なrequiresを書かれる事もなく、pipelineのlistだけを変数やdictを使って上手く書いてやれば良いだけになる。

jupyter notebookと行き来するのがダルい

gokart自体をjupyter notebookで動かす方法はあるものの、かなりハックじみた方法となる*1

gokart-pipelinerはjupyter notebook上で動く。
gokart-pipeliner/Example.ipynb at main · vaaaaanquish/gokart-pipeliner · GitHub

Task同士は中間ファイルでやり取りされるのでメモリをバカ食いしないし、classである事さえ意識して一般的なソフトウェア開発の心得に沿って書けば、ここで書いたコードをそのままproductionコードにするのも容易になる。

もちろんタスクを動かした後、thunderboltを使って出力ファイルをメモリに読み込むなどして、jupyter上で触っても良い。
github.com

future work

今後上手く使えてきたらできそうなこと

  • pipelineのリストの書き方のベストプラクティスを探る
  • runの返り値として出力データを返したりできるようにする
  • pipelineを決めたら並列化できる所を自動で並列に動作させたりする
  • jupyter notebookからも別のプロセスとして動かす(Taskが動く最中もnotebookが実行できる)

おわりに

試しに作ってみた段階なので、これからコンペに出たり本番運用に使ってみたりして調整していきたい。

gokartは良いぞという話を散々書いたが、gokartは中間ファイルを経由するだけにDataFrameを良く使うテーブルデータでは使いやすく、画像コンペのような所では全然良さを発揮できなかったりする。いやいや画像音声テーブルなんでも自分のツールは共通化しておきたいわ、という人に不向きという所をなんとか改善できればと思ってはいる。

なんとなく形になったら、gokartにマージしてもらって、gokartの定番となっても良い気もする。

がんばろう。

 

*1:機械学習プロジェクト向けPipelineライブラリgokartを用いた開発と運用 - エムスリーテックブログ https://www.m3tech.blog/entry/2019/09/30/120229 を参照すると良い