Treasure Data - Support Engineering Team blog

トレジャーデータのサポートエンジニアリングチームのブログです。

Incremental Loading とは

こんにちは、Treasure Data(TD) サポートの伊藤です。

今回は差分インポートするための機能である Incremental Loading 機能について触れようと思います。

差分インポートとは

例えばあるデータベース(MySQLなど)のあるテーブルから、TDとデータ連携(TDへインポート)するケースを考えます。 そのテーブルは定常的にデータが投入されているとします。

データ連携するシンプルな方法として、該当テーブルから全レコードを取得して、TDへ格納する際に replace するという方法が考えられます。 テーブルサイズが小さい場合はこれで十分かと思います。

ですが、例えば対象テーブルのレコード数が数億件だとするとどうでしょうか? 毎日数億件のレコードを SELECT してTDにインポートするという処理は、重い処理だという印象を受ける方がほとんどかと思います。

そこで、インポートしていない差分となるレコードだけインポートしたいと誰しも考えるかと思います。それを実現する機能が Incremental Loading です。 差分と判定するロジックが要件に合致するのであれば、必要以上の負荷をTDとインポート元の双方にかけることなくデータ連携することが可能です。

Incremental Loadingを使うには

Incremental LoadingSource のオプション機能です。TDコンソールの Incremental? にチェックを入れることで利用できます(デフォルトでチェックが入っています)。

incremental loading 設定箇所

全てのサービスに対して Incremental Loading 機能が利用できるわけではないことには注意してください。例えば Google Sheets からのインポートでは Incremental Loading は利用できません。

差分判定ロジックについて

Incremental Loading に限った話ではありませんが、差分インポートするにはインポート対象のデータの何かしらをもって、差分データを判別する必要があります。 Incremental Loading の判定ロジックは連携先サービスによっていくつか分類できるため、それぞれ紹介していきます。

辞書式ソートによる判定(S3、SFTPなど)

ファイルを配置する系統のサービス(AWS S3 や SFTP)の場合は、利用者が設定したPath prefixに合致するファイル達を辞書式でソートして、最後になったファイルパスを記憶するという挙動になっています。 そうすることで次回実行時に、記憶していたファイルパスより辞書式でソートした結果後ろにくるファイルなのであれば、晴れてインポート対象と判別されます。

理解しやすくするために具体的に Path prefix が 202011 で、初回実行時に下記ファイルが存在している場合で考えましょう。

20201126.csv
20201127.csv
20201128.csv

Path prefix に 202011 が設定されているため、202011xxxxxxx というファイルパスであればインポート対象となります。 そのため初回実行時は下記3ファイルがインポート対象となります。そして辞書式で並べると 20201128.csv が最後になるため、差分判定のためにこのファイルパスが記憶されます。

20201126.csv <-- インポートされる
20201127.csv <-- インポートされる
20201128.csv <-- インポートされる(次回実行時用に記憶される)

このSourceをもう一度実行すると、20201125.csv20201129.csv という2つのファイルが新たに配置されていたとします。 その場合、当然 Path prefix 202011 に合致する必要がありますが、辞書式でソートしたときに前回記憶していた 20201128.csv 以降となる必要があります。 これらの条件を満たすものは、存在している5つのファイルのうち 20201129.csv のみであり、今回のインポート対象となります。

20201125.csv は実際にはインポートされていないファイルなのですが、判定ロジックでは差分とみなされずインポートされません。

20201125.csv
20201126.csv <-- インポート済
20201127.csv <-- インポート済
20201128.csv <-- インポート済(次回実行時用に記憶)
20201129.csv

ちなみにS3の場合のみ、更新日付を元に差分判定させることが可能です。詳細はドキュメントをご参照ください。

クエリのWHERE句で値の大小関係による判定(MySQL、Big Queryなど)

MySQL や Big Query のようにクエリを発行することでインポートするレコードを取得する場合は、そのクエリにおけるWHERE句での値の大小で差分を判定しています。 Incremental? にチェックを入れて、差分判定するカラム名を設定します。

Incremental Loading for MySQL

この設定をした Source を実行すると、実際には下記クエリがDB(MySQL)に対して実行されます。バインド変数(クエリ内の ? です)には、前回実行時の最大値が格納されるため、差分判定するカラム(下記では col1)の値が前回実行時にインポートした最大値よりも大きいレコードはすべて差分として判定されます。

SELECT ...
  FROM `target_table`
 WHERE ((`col1` > ?))
...

この条件に合致しないレコードなのであれば、それが前回インポート後に UPDATE されたレコードだとしても、差分判定する条件に合わないためインポートされることはありません。

対象期間を変動させる(Google Analyticsなど)

Google Analytics Reporting のように start_date などを指定しインポート期間を指定するタイプの場合、Incremental Loading を利用すると、start_date が実行するごとに変わっていく挙動になります。先述した他の2つと比較するとシンプルですね。

Workflow の td_load>: オペレーターの場合

Workflow上で Data Connector を利用する td_load>: オペレータには2通りの使い方がありますが、yamlファイルを指定する場合は Incremental Loading を利用することができません。

利用方法 Incremental Loading
インポート設定を yaml ファイルとして記載し指定 利用不可
既存の Source の Unique IDを指定し、該当Sourceを実行 利用可能

代わりに yaml ファイル内では Workflow の変数を利用できるため、そちらを利用することでインポート対象を実行される度に変更することができます。

例えば下記ケースをどのようにして実現するのかを考えてみましょう。

  • インポート対象のcsvファイルは AWS S3 に配置される
  • ファイルパスの命名規則/target_dir/yyyymmdd_test.csv で、yyyymmdd はファイルが配置された日時になる
  • Workflowを日次で実行し、実行日の日付をファイルパスに含むもののみをインポート対象とする

ファイルパスは yaml ファイルの path_prefix で指定できます。 また、実行日はビルトイン変数session_date_compact が良さそうです。

実際に実装してみると下記のようになります。

in:
  type: s3
  access_key_id: your_access_key_id
  secret_access_key: your_secret_access_key
  bucket: your_bucket
  path_prefix: /target_dir/${session_date_compact}_test.csv #ビルトイン変数を利用してパスを指定
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    skip_header_lines: 1
    columns:
...

2020/12/09 に実行されると session_date_compact=20201209 となるため、/target_dir/20201209_test.csv というファイルが存在すればインポートしてくれます。

上記は一例に過ぎませんが、Workflowは Moment.js がバンドルされており、変数内 ${....} では簡単な JavaScript が動作する(Moment.js含め)ため、柔軟にインポート対象を絞り込むことができるでしょう。

Incremental Loading の弊害

Incremental Loading はデフォルトで有効になっているのですが、利用者がそれに気づかないことによる弊害があります。

よくサポート問い合わせを頂戴するケースとして、Sourceを実行したのに1レコードもTDにインポートされないということが挙げられます。設定などを確認させていただくと、 Incremental Loading が有効化されていて、同じSourceを以前実行したことがある形跡があります。

その場合、先述した通り前回実行時の情報をベースにインポート対象が決まりますので、そのロジックに合致するインポート対象が存在しない場合は0件インポートとなってしまいます。

Incremental Loading になっているかどうかは対象ジョブの情報を参照したときの Query タブで確認することができます。"incremental": true が設定されていれば Incremental Loading が有効化されています。

"incremental": true

そして、 Incremental Loading で差分だと判定する基準については、config_diff で確認することができます。

config_diff

デフォルトで有効化されているが故に、意図して設定していないことで期待したインポートができないということがありえます。もしそういった事象に遭遇した際はログを見る他に上記確認してみると良いかもしれません。

最後に

本記事を読んでいただき、Incremental Loading について理解を深めていただけたのであれば幸いです。

毎回インポート対象が同じケースは少ないと思いますので、本機能を利用してTDを有意義に利用していただけると嬉しいです。