Treasure Data - Support Engineering Team blog

トレジャーデータのサポートエンジニアリングチームのブログです。

Jobの種類について

こんにちは、Treasure Dataサポートの伊藤です。

本記事では、Treasure Dataの要と言っても過言ではない Job について詳細見ていきます。

Jobとは

Treasure Dataのコンポーネントとして、Jobというものがあります。 TDコンソールの Job Activities にて確認できるもの、とイメージいただくと良いかと思います。

Job は処理の単位を表し、その種類ごとに目的や仕様などが変わります。下記概要になりますが、各種類について詳細は後述するので気になる方はご参照ください。

Jobの種類 Job Activities上のラベル 処理内容概要
Hive Hive クエリエンジンHiveでSQLを実行したJob
Presto Presto クエリエンジンPrestoでSQLを実行したJob
Partial Delete Partial Delete td table:partial_deleteコマンドによって実行されるPrestoのDELETE文のJob
Result Export Result Export HiveやPrestoによって抽出した結果をエクスポートするためのJob
Bulk Load Data Import Data Connector(Source, Workflowのtd_load>:オペレータ)によるデータインポートのためのJob
Bulk Import Bulk Import Job td import:autoコマンドや、Embulkのembulk-output-tdプラグインによるデータ投入のJob
Bulk Export Export td table:exportコマンドなどによるAWS S3へテーブルをエクスポートするためのJob

Hive Job

Treasure Dataはテーブルとして格納されているデータをSQLで抽出することができます。その際にクエリエンジンとして2種類選択肢があり、そのうちの1つが Hive となります。

下記手順でSQLを記載して実行する画面に遷移できますが、 Type にて Hive 2020.1 (stable) を選択して実行した際に、Hive Jobが発行されます。

Data Workbench(歯車アイコン) --> Queries --> New Query ボタン

Hiveを選択する

Workflowから Hive Job を実行することもできます。下記例のように td>: オペレータを engine: hive オプションとともに利用すると Hive Job が実行されます。

+run_hive_query:
  td>:
  query: "SELECT col1, col2 FROM test_table;"
  engine: hive
  engine_version: stable
  database: test_db

他にも Hive Job を実行する手段はいくつかありますが、今回は割愛します。

Presto Job

SQLにてデータ抽出する際に利用できるクエリエンジンのもうひとつの選択肢が Presto です。 先述したクエリ実行画面にて TypePresto を指定して実行した際に実行されます。

Prestoを選択する

Hive Job と同様に、Presto Job も Workflow から実行することができます。 下記例のように engine: presto オプションを指定すると Presto Jobが実行されます。 (engineオプションのデフォルト値は presto のため、本プションを省略した場合も Presto Job が実行されます)

+run_presto_query:
  td>:
  query: "SELECT col1, col2 FROM test_table;"
  engine: presto
  database: test_db

Presto Jobの場合も他の手段で実行することは可能ですが割愛します。

Partial Delete Job

Treasure Dataには Toolbelt と呼ばれるCLIツールがあります。このツールで td table:partial_delete コマンドを実行すると、指定したテーブルのレコードの一部を削除するための Job である Partial Delete Job が実行されます。

$ td table:partial_delete test_db test_table --from 1568876400 --to 1568880000

Partial Delete Job と言っても実体は Presto Job で、例えば上記コマンドで実行されるジョブは下記クエリ文をPrestoで実行しています。

DELETE FROM test_table WHERE 1568876400 <= time AND time < 1568880000

Workflowで実行することもでき、下記サンプルのように td_partial_delete>: オペレータを使用します。

+step1:
  td_partial_delete>: test_table
  database: test_db
  from: 2016-01-01T00:00:00+09:00
  to:   2016-02-01T00:00:00+09:00

※ Partial Deleteリリース当初はPrestoのDELETE文でレコード削除することはできなかったため現在のような状況となっていますが、現時点ではDELETE文で柔軟に削除対象レコードを指定して削除することが可能です

Result Export Job

SQL(Hive/Presto)で抽出した結果を外部サービス(AWS S3やSFTPサーバーなど)へエクスポートすることが可能です。 この機能を利用する際に実行される Job を Result Export Job と呼びます。

Export Resultsチェックボックスにチェックを入れてエクスポート先を設定していると、SQLを実行する際に抽出結果を設定した宛先にエクスポートします。

Export Results チェックボックス

あまりこの Result Export Job 自体を意識することはないかと思いますが、 Job Activities でクエリ文が RESULT EXPORT FROM JOB <HiveかPresto JobのID> となっているジョブが該当します。クエリ文にエクスポートするデータを抽出した際の job_id が記載されているので、どのデータをエクスポートしようとしているのかを確認することができます。

クエリJobとResult Export Job

また、Workflowから Result Export Job を実行するには下記のようなサンプルになります。 Hive/Presto Jobを実行する必要があるので td>: オペレータを利用します。エクスポートするための設定は result_connection:result_settings: オプションで行い、 result_connection: にはAuthentication名を、result_settings: にはエクスポート先情報を指定することになります。

+run_presto_query:
  td>:
  query: "SELECT col1, col2 FROM test_table;"
  engine: presto
  database: test_db
  result_connection: test_authentication_s3
  result_settings:
    bucket: test-bucket
    path: /202205/test.csv
    header: true

Bulk Load Job

外部サービス(例えばAWS S3やBigQueryなど)からTreasure Dataへインポートする際に実行される Job が Bulk Load です。

TDコンソールでこの Job を利用するには、 Source と呼ばれる機能を利用します。既存のSourceは下記で確認することができます。

Integrations Hub --> Sources

新たにSourceを作成するには、事前に Authentication と呼ばれる資格情報(ユーザー名やパスワード、秘密鍵鍵など)を設定するためのコンポーネントを作成しておく必要があります。 下記から連携先サービスを選択し、Authenticationを作成します。

Integrations Hub --> Catalog

Integrations Hub --> Catalog

Workflowから Bulk Load Job を実行する場合は td_load>: オペレータを利用します。 yamlファイルにインポートするための設定を記述し、 td_load>: オペレータでyamlファイルを指定する形で利用するケースが多いでしょう。

+load:
  td_load>: config/daily_load.yml
  database: ${td.dest_db}
  table: ${td.dest_table}

config/daily_load.yml

in:
  type: s3
  access_key_id: ${secret:s3.access_key_id}
  secret_access_key: ${secret:s3.secret_access_key}
  bucket: test-bucket
  path_prefix: 20220725_test
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    skip_header_lines: 1
    columns:
    - name: id
      type: long
...

td connector:issue コマンドなど他の手段で実行することが可能な Job の種類ですが、今回は割愛します。

Bulk Import Job

Treasure Dataへインポートする際に実行されるJobの1種が Bulk Import Job です。 先述した Bulk Load Job とは名前が似ていますが全く別の種類のJobとなります。

下記のようなケースで発行される Job になり、その名の通りTreasure Dataへレコードをインポートします。

Bulk Export Job

Treasure Dataのテーブルを AWS S3 へエクスポートするという限られた目的のためのJobが Bulk Export Job です。 外部サービスにエクスポートするという点では Result Export Job と同じですが、それとは異なりBulk Export JobはSQLで抽出した結果をエクスポートするわけではありません。 また、Treausre Dataのリージョンとエクスポート先のAWS S3バケットのリージョンが同一であるという制限事項もあります。

Toolbelt(CLI)の場合は td table:export コマンドで、Workflowでは td_table_export>: オペレータによって実行します。

+step1:
  td_table_export>:
  database: mydb
  table: mytable
  file_format: jsonl.gz
  from: 2016-01-01 00:00:00 +0800
  to:   2016-02-01 00:00:00 +0800
  s3_bucket: my_backup_backet
  s3_path_prefix: mydb/mytable

最後に

いかがだったでしょうか? Jobについてふんわりとして持っていたイメージが具体化できたのであれば幸いです。