こんにちは、Treasure Dataサポートの伊藤です。
本記事では、Treasure Dataの要と言っても過言ではない Job
について詳細見ていきます。
Jobとは
Treasure Dataのコンポーネントとして、Jobというものがあります。
TDコンソールの Job Activities
にて確認できるもの、とイメージいただくと良いかと思います。
Job は処理の単位を表し、その種類ごとに目的や仕様などが変わります。下記概要になりますが、各種類について詳細は後述するので気になる方はご参照ください。
Jobの種類 | Job Activities上のラベル | 処理内容概要 |
---|---|---|
Hive | Hive | クエリエンジンHiveでSQLを実行したJob |
Presto | Presto | クエリエンジンPrestoでSQLを実行したJob |
Partial Delete | Partial Delete | td table:partial_deleteコマンドによって実行されるPrestoのDELETE文のJob |
Result Export | Result Export | HiveやPrestoによって抽出した結果をエクスポートするためのJob |
Bulk Load | Data Import | Data Connector(Source, Workflowのtd_load>:オペレータ)によるデータインポートのためのJob |
Bulk Import | Bulk Import Job | td import:autoコマンドや、Embulkのembulk-output-tdプラグインによるデータ投入のJob |
Bulk Export | Export | td table:exportコマンドなどによるAWS S3へテーブルをエクスポートするためのJob |
Hive Job
Treasure Dataはテーブルとして格納されているデータをSQLで抽出することができます。その際にクエリエンジンとして2種類選択肢があり、そのうちの1つが Hive
となります。
下記手順でSQLを記載して実行する画面に遷移できますが、 Type
にて Hive 2020.1 (stable)
を選択して実行した際に、Hive Jobが発行されます。
Data Workbench(歯車アイコン) --> Queries --> New Query ボタン
Workflowから Hive Job を実行することもできます。下記例のように td>:
オペレータを engine: hive
オプションとともに利用すると Hive Job が実行されます。
+run_hive_query: td>: query: "SELECT col1, col2 FROM test_table;" engine: hive engine_version: stable database: test_db
他にも Hive Job を実行する手段はいくつかありますが、今回は割愛します。
Presto Job
SQLにてデータ抽出する際に利用できるクエリエンジンのもうひとつの選択肢が Presto
です。
先述したクエリ実行画面にて Type
に Presto
を指定して実行した際に実行されます。
Hive Job と同様に、Presto Job も Workflow から実行することができます。
下記例のように engine: presto
オプションを指定すると Presto Jobが実行されます。
(engineオプションのデフォルト値は presto のため、本プションを省略した場合も Presto Job が実行されます)
+run_presto_query: td>: query: "SELECT col1, col2 FROM test_table;" engine: presto database: test_db
Presto Jobの場合も他の手段で実行することは可能ですが割愛します。
Partial Delete Job
Treasure Dataには Toolbelt と呼ばれるCLIツールがあります。このツールで td table:partial_delete
コマンドを実行すると、指定したテーブルのレコードの一部を削除するための Job である Partial Delete Job が実行されます。
$ td table:partial_delete test_db test_table --from 1568876400 --to 1568880000
Partial Delete Job と言っても実体は Presto Job で、例えば上記コマンドで実行されるジョブは下記クエリ文をPrestoで実行しています。
DELETE FROM test_table WHERE 1568876400 <= time AND time < 1568880000
Workflowで実行することもでき、下記サンプルのように td_partial_delete>:
オペレータを使用します。
+step1: td_partial_delete>: test_table database: test_db from: 2016-01-01T00:00:00+09:00 to: 2016-02-01T00:00:00+09:00
※ Partial Deleteリリース当初はPrestoのDELETE文でレコード削除することはできなかったため現在のような状況となっていますが、現時点ではDELETE文で柔軟に削除対象レコードを指定して削除することが可能です
Result Export Job
SQL(Hive/Presto)で抽出した結果を外部サービス(AWS S3やSFTPサーバーなど)へエクスポートすることが可能です。 この機能を利用する際に実行される Job を Result Export Job と呼びます。
Export Results
のチェックボックスにチェックを入れてエクスポート先を設定していると、SQLを実行する際に抽出結果を設定した宛先にエクスポートします。
あまりこの Result Export Job 自体を意識することはないかと思いますが、 Job Activities
でクエリ文が RESULT EXPORT FROM JOB <HiveかPresto JobのID>
となっているジョブが該当します。クエリ文にエクスポートするデータを抽出した際の job_id が記載されているので、どのデータをエクスポートしようとしているのかを確認することができます。
また、Workflowから Result Export Job を実行するには下記のようなサンプルになります。
Hive/Presto Jobを実行する必要があるので td>:
オペレータを利用します。エクスポートするための設定は result_connection:
と result_settings:
オプションで行い、 result_connection:
にはAuthentication名を、result_settings:
にはエクスポート先情報を指定することになります。
+run_presto_query: td>: query: "SELECT col1, col2 FROM test_table;" engine: presto database: test_db result_connection: test_authentication_s3 result_settings: bucket: test-bucket path: /202205/test.csv header: true
Bulk Load Job
外部サービス(例えばAWS S3やBigQueryなど)からTreasure Dataへインポートする際に実行される Job が Bulk Load です。
TDコンソールでこの Job を利用するには、 Source と呼ばれる機能を利用します。既存のSourceは下記で確認することができます。
Integrations Hub --> Sources
新たにSourceを作成するには、事前に Authentication と呼ばれる資格情報(ユーザー名やパスワード、秘密鍵鍵など)を設定するためのコンポーネントを作成しておく必要があります。 下記から連携先サービスを選択し、Authenticationを作成します。
Integrations Hub --> Catalog
Workflowから Bulk Load Job を実行する場合は td_load>:
オペレータを利用します。
yamlファイルにインポートするための設定を記述し、 td_load>:
オペレータでyamlファイルを指定する形で利用するケースが多いでしょう。
+load: td_load>: config/daily_load.yml database: ${td.dest_db} table: ${td.dest_table}
config/daily_load.yml
in: type: s3 access_key_id: ${secret:s3.access_key_id} secret_access_key: ${secret:s3.secret_access_key} bucket: test-bucket path_prefix: 20220725_test parser: charset: UTF-8 newline: CRLF type: csv skip_header_lines: 1 columns: - name: id type: long ...
td connector:issue
コマンドなど他の手段で実行することが可能な Job の種類ですが、今回は割愛します。
Bulk Import Job
Treasure Dataへインポートする際に実行されるJobの1種が Bulk Import Job です。 先述した Bulk Load Job とは名前が似ていますが全く別の種類のJobとなります。
下記のようなケースで発行される Job になり、その名の通りTreasure Dataへレコードをインポートします。
- CLI(Toolbelt)の td import:auto コマンド
- OSSのEmbulkにおいて、embulk-output-tdプラグインでTreasure Dataへアウトプットする(参考ドキュメント)
- Data Exchange(TD2TD)機能によってTreasure Dataへ書き出す(Result Export to Treasure Data)
Bulk Export Job
Treasure Dataのテーブルを AWS S3 へエクスポートするという限られた目的のためのJobが Bulk Export Job です。 外部サービスにエクスポートするという点では Result Export Job と同じですが、それとは異なりBulk Export JobはSQLで抽出した結果をエクスポートするわけではありません。 また、Treausre Dataのリージョンとエクスポート先のAWS S3バケットのリージョンが同一であるという制限事項もあります。
Toolbelt(CLI)の場合は td table:export コマンドで、Workflowでは td_table_export>:
オペレータによって実行します。
+step1: td_table_export>: database: mydb table: mytable file_format: jsonl.gz from: 2016-01-01 00:00:00 +0800 to: 2016-02-01 00:00:00 +0800 s3_bucket: my_backup_backet s3_path_prefix: mydb/mytable
最後に
いかがだったでしょうか? Jobについてふんわりとして持っていたイメージが具体化できたのであれば幸いです。