こんにちは、Treasure Data(TD) サポートの伊藤です。
今回は差分インポートするための機能である Incremental Loading
機能について触れようと思います。
- 差分インポートとは
- Incremental Loadingを使うには
- 差分判定ロジックについて
- Workflow の td_load>: オペレーターの場合
- Incremental Loading の弊害
- 最後に
差分インポートとは
例えばあるデータベース(MySQLなど)のあるテーブルから、TDとデータ連携(TDへインポート)するケースを考えます。 そのテーブルは定常的にデータが投入されているとします。
データ連携するシンプルな方法として、該当テーブルから全レコードを取得して、TDへ格納する際に replace するという方法が考えられます。 テーブルサイズが小さい場合はこれで十分かと思います。
ですが、例えば対象テーブルのレコード数が数億件だとするとどうでしょうか? 毎日数億件のレコードを SELECT してTDにインポートするという処理は、重い処理だという印象を受ける方がほとんどかと思います。
そこで、インポートしていない差分となるレコードだけインポートしたいと誰しも考えるかと思います。それを実現する機能が Incremental Loading
です。
差分と判定するロジックが要件に合致するのであれば、必要以上の負荷をTDとインポート元の双方にかけることなくデータ連携することが可能です。
Incremental Loadingを使うには
Incremental Loading
は Source
のオプション機能です。TDコンソールの Incremental?
にチェックを入れることで利用できます(デフォルトでチェックが入っています)。
全てのサービスに対して Incremental Loading
機能が利用できるわけではないことには注意してください。例えば Google Sheets からのインポートでは Incremental Loading
は利用できません。
差分判定ロジックについて
Incremental Loading
に限った話ではありませんが、差分インポートするにはインポート対象のデータの何かしらをもって、差分データを判別する必要があります。 Incremental Loading
の判定ロジックは連携先サービスによっていくつか分類できるため、それぞれ紹介していきます。
辞書式ソートによる判定(S3、SFTPなど)
ファイルを配置する系統のサービス(AWS S3 や SFTP)の場合は、利用者が設定したPath prefixに合致するファイル達を辞書式でソートして、最後になったファイルパスを記憶するという挙動になっています。 そうすることで次回実行時に、記憶していたファイルパスより辞書式でソートした結果後ろにくるファイルなのであれば、晴れてインポート対象と判別されます。
理解しやすくするために具体的に Path prefix が 202011
で、初回実行時に下記ファイルが存在している場合で考えましょう。
20201126.csv 20201127.csv 20201128.csv
Path prefix に 202011
が設定されているため、202011xxxxxxx
というファイルパスであればインポート対象となります。
そのため初回実行時は下記3ファイルがインポート対象となります。そして辞書式で並べると 20201128.csv
が最後になるため、差分判定のためにこのファイルパスが記憶されます。
20201126.csv <-- インポートされる 20201127.csv <-- インポートされる 20201128.csv <-- インポートされる(次回実行時用に記憶される)
このSourceをもう一度実行すると、20201125.csv
と 20201129.csv
という2つのファイルが新たに配置されていたとします。
その場合、当然 Path prefix 202011
に合致する必要がありますが、辞書式でソートしたときに前回記憶していた 20201128.csv
以降となる必要があります。
これらの条件を満たすものは、存在している5つのファイルのうち 20201129.csv
のみであり、今回のインポート対象となります。
20201125.csv
は実際にはインポートされていないファイルなのですが、判定ロジックでは差分とみなされずインポートされません。
20201125.csv 20201126.csv <-- インポート済 20201127.csv <-- インポート済 20201128.csv <-- インポート済(次回実行時用に記憶) 20201129.csv
ちなみにS3の場合のみ、更新日付を元に差分判定させることが可能です。詳細はドキュメントをご参照ください。
クエリのWHERE句で値の大小関係による判定(MySQL、Big Queryなど)
MySQL や Big Query のようにクエリを発行することでインポートするレコードを取得する場合は、そのクエリにおけるWHERE句での値の大小で差分を判定しています。
Incremental?
にチェックを入れて、差分判定するカラム名を設定します。
この設定をした Source
を実行すると、実際には下記クエリがDB(MySQL)に対して実行されます。バインド変数(クエリ内の ?
です)には、前回実行時の最大値が格納されるため、差分判定するカラム(下記では col1
)の値が前回実行時にインポートした最大値よりも大きいレコードはすべて差分として判定されます。
SELECT ... FROM `target_table` WHERE ((`col1` > ?)) ...
この条件に合致しないレコードなのであれば、それが前回インポート後に UPDATE されたレコードだとしても、差分判定する条件に合わないためインポートされることはありません。
対象期間を変動させる(Google Analyticsなど)
Google Analytics Reporting のように start_date などを指定しインポート期間を指定するタイプの場合、Incremental Loading
を利用すると、start_date
が実行するごとに変わっていく挙動になります。先述した他の2つと比較するとシンプルですね。
Workflow の td_load>: オペレーターの場合
Workflow上で Data Connector
を利用する td_load>:
オペレータには2通りの使い方がありますが、yamlファイルを指定する場合は Incremental Loading
を利用することができません。
利用方法 | Incremental Loading |
---|---|
インポート設定を yaml ファイルとして記載し指定 | 利用不可 |
既存の Source の Unique IDを指定し、該当Sourceを実行 |
利用可能 |
代わりに yaml ファイル内では Workflow の変数を利用できるため、そちらを利用することでインポート対象を実行される度に変更することができます。
例えば下記ケースをどのようにして実現するのかを考えてみましょう。
- インポート対象のcsvファイルは AWS S3 に配置される
- ファイルパスの命名規則は
/target_dir/yyyymmdd_test.csv
で、yyyymmdd
はファイルが配置された日時になる - Workflowを日次で実行し、実行日の日付をファイルパスに含むもののみをインポート対象とする
ファイルパスは yaml ファイルの path_prefix
で指定できます。
また、実行日はビルトイン変数の session_date_compact
が良さそうです。
実際に実装してみると下記のようになります。
in: type: s3 access_key_id: your_access_key_id secret_access_key: your_secret_access_key bucket: your_bucket path_prefix: /target_dir/${session_date_compact}_test.csv #ビルトイン変数を利用してパスを指定 parser: charset: UTF-8 newline: CRLF type: csv skip_header_lines: 1 columns: ...
2020/12/09 に実行されると session_date_compact=20201209
となるため、/target_dir/20201209_test.csv
というファイルが存在すればインポートしてくれます。
上記は一例に過ぎませんが、Workflowは Moment.js がバンドルされており、変数内 ${....}
では簡単な JavaScript が動作する(Moment.js含め)ため、柔軟にインポート対象を絞り込むことができるでしょう。
Incremental Loading の弊害
Incremental Loading
はデフォルトで有効になっているのですが、利用者がそれに気づかないことによる弊害があります。
よくサポート問い合わせを頂戴するケースとして、Sourceを実行したのに1レコードもTDにインポートされないということが挙げられます。設定などを確認させていただくと、 Incremental Loading
が有効化されていて、同じSourceを以前実行したことがある形跡があります。
その場合、先述した通り前回実行時の情報をベースにインポート対象が決まりますので、そのロジックに合致するインポート対象が存在しない場合は0件インポートとなってしまいます。
Incremental Loading
になっているかどうかは対象ジョブの情報を参照したときの Query
タブで確認することができます。"incremental": true
が設定されていれば Incremental Loading
が有効化されています。
そして、 Incremental Loading
で差分だと判定する基準については、config_diff
で確認することができます。
デフォルトで有効化されているが故に、意図して設定していないことで期待したインポートができないということがありえます。もしそういった事象に遭遇した際はログを見る他に上記確認してみると良いかもしれません。
最後に
本記事を読んでいただき、Incremental Loading
について理解を深めていただけたのであれば幸いです。
毎回インポート対象が同じケースは少ないと思いますので、本機能を利用してTDを有意義に利用していただけると嬉しいです。