いつも忘れるので書いておきます。導入企業は増えてますが、AWSみたく数百円程度で自分の環境が作れないので(ミニマム100万~ぐらいのサービスです)誰かしらに環境を借りないと検証が出来ないのが、アナリスト視点だとちょっと辛いですね。
標題の件について、大抵はS3等の外部のサービスから、TDまでデータを持ってくる時が多いのですが、わりとCSVを多用している会社も多く、そんなときの方法です。
公式マニュアル見たほうが早いわ!って方は以下を参考にしてください。またリンクが度々変わるので(謎)リンク切れの場合はTOPから検索をお願いします。
https://support.treasuredata.com/hc/en-us/articles/360001495428-Bulk-Import-from-CSV-files
https://support.treasuredata.com/
TD Toolbeltのインストール
Treasure Data Toolbeltを以下URLからインストールします。Windows、Mac、Linuxなどに対応しています。
http://ybi-toolbelt.idcfcloud.com/
初期設定:td.confファイルの作成
毎回パスワードを入れるのは面倒なのでローカルにファイルを作成し、こいつを読みに行かせます。
1 |
td account -f |
WEBコンソールに入る為のアドレスとパスワードを聞かれるので入力。
1 2 3 4 |
Enter your Treasure Data credentials. For Google SSO user, please see https://support.treasuredata.com/hc/en-us/articles/360000720048-Treasure-Data-Toolbelt-Command-line-Interface#Google%20SSO%20Users Email: aaa@example.com Password (typing will be hidden): Authenticated successfully. |
するとtd.confという設定ファイルが作成されます。Windowsの場合ユーザー直下に作成される模様。同一PCで複数人が共有PCとして使っているとルートに作っちゃうとヤバいからかしら。保存場所の変更方法が謎なので分かったら追記します。
1 |
C:\Users\your_account\.td\td.conf |
尚、ファイルの中身は以下の通りとなっており、APIキーにはtreasureのMyアカウント内にあるMaster keyが設定される。
1 2 3 4 |
[account] user = info@example.com apikey = 1234/12341234123412341234123412341234 endpoint = https://api.treasuredata.com |
ヘルプを確認
help:allコマンドで確認できる。疑問はググれば大抵出てくるのですがtreasureネタはそもそも少なく、豊富なのは英語の公式ドキュメント等になるので、まよったらhelpコマンドから解決するのもいいかと思います。
1 |
td help:all |
結果↓↓
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
db:list # Show list of tables in a database db:show <db> # Describe information of a database db:create <db> # Create a database db:delete <db> # Delete a database export:result <job_id> <result> # Export query result to the specified destination with Result URL or JSON export:table <db> <table> # Export table to the specified storage table:list [db] # Show list of tables table:show <db> <table> # Describe information of a table table:create <db> <table> # Create a table table:delete <db> <table> # Delete a table table:import <db> <table> <files...> # Parse and import files to a table table:export <db> <table> # Dump logs in a table to the specified storage table:swap <db> <table1> <table2> # Swap names of two tables table:rename <db> <from_table> <dest_table> # rename exist table table:tail <db> <table> # Get recently imported logs table:partial_delete <db> <table> # Delete logs from the table within the specified time range table:expire <db> <table> <expire_days> # Expire data in table after specified number of days. Set to 0 to disable the expiration. table:update <db> <table> # Update table options bulk_import:list # List bulk import sessions bulk_import:show <name> # Show list of uploaded parts bulk_import:create <name> <db> <table> # Create a new bulk import session to the table bulk_import:prepare_parts <files...> # Convert files into part file format bulk_import:upload_part <name> <id> <path.msgpack.gz> # Upload or re-upload a file into a bulk import session bulk_import:upload_parts <name> <files...> # Upload or re-upload files into a bulk import session bulk_import:delete_part <name> <id> # Delete a uploaded file from a bulk import session bulk_import:delete_parts <name> <ids...> # Delete uploaded files from a bulk import session bulk_import:perform <name> # Start to validate and convert uploaded files bulk_import:error_records <name> # Show records which did not pass validations bulk_import:commit <name> # Start to commit a performed bulk import session bulk_import:delete <name> # Delete a bulk import session bulk_import:freeze <name> # Reject succeeding uploadings to a bulk import session bulk_import:unfreeze <name> # Unfreeze a frozen bulk import session import:list # List bulk import sessions import:show <name> # Show list of uploaded parts import:create <name> <db> <table> # Create a new bulk import session to the table import:jar_version # Show import jar version import:jar_update # Update import jar to the latest version import:prepare <files...> # Convert files into part file format import:upload <name> <files...> # Upload or re-upload files into a bulk import session import:auto <name> <files...> # Upload files and automatically perform and commit the data import:perform <name> # Start to validate and convert uploaded files import:error_records <name> # Show records which did not pass validations import:commit <name> # Start to commit a performed bulk import session import:delete <name> # Delete a bulk import session import:freeze <name> # Reject succeeding uploadings to a bulk import session import:unfreeze <name> # Unfreeze a frozen bulk import session import:config <files...> # create guess config from arguments result:list # Show list of result URLs result:show <name> # Describe information of a result URL result:create <name> <URL> # Create a result URL result:delete <name> # Delete a result URL status # Show schedules, jobs, tables and results schema:show <db> <table> # Show schema of a table schema:set <db> <table> [columns...] # Set new schema on a table schema:add <db> <table> <columns...> # Add new columns to a table schema:remove <db> <table> <columns...> # Remove columns from a table sched:list # Show list of schedules sched:create <name> <cron> [sql] # Create a schedule sched:delete <name> # Delete a schedule sched:update <name> # Modify a schedule sched:history <name> [max] # Show history of scheduled queries sched:run <name> <time> # Run scheduled queries for the specified time sched:result <name> # Show status and result of the last job ran. # --last [N] option enables to show the result before N from the last. # The other options are identical to those of the 'job:show' command. query [sql] # Issue a query job:show <job_id> # Show status and result of a job job:status <job_id> # Show status progress of a job job:list [max] # Show list of jobs job:kill <job_id> # Kill or cancel a job password:change # Change password apikey:show # Show Treasure Data API key apikey:set <apikey> # Set Treasure Data API key server:status # Show status of the Treasure Data server server:endpoint <api_endpoint> # Set the Treasure Data API server's endpoint (must be a valid URI) sample:apache <path.json> # Create a sample log file connector:guess [config] # Run guess to generate connector config file connector:preview <config> # Show preview of connector execution connector:issue <config> # Run one time connector execution connector:list # Show list of connector sessions connector:create <name> <cron> <database> <table> <config> # Create new connector session connector:show <name> # Show connector session connector:update <name> [config] # Modify connector session connector:delete <name> # Delete connector session connector:history <name> # Show job history of connector session connector:run <name> [time] # Run connector with session for the specified time option workflow # Run a workflow command workflow:reset # Reset the workflow module workflow:update [version] # Update the workflow module workflow:version # Show workflow module version |
アクセス可能なDBの確認
db:listで権限をもらっているDBを確認できます。
1 |
td db:list |
結果↓↓
1 2 3 4 5 6 7 8 9 |
+-----------------------+------------+ | Name | Count | +-----------------------+------------+ | your_database_a | 73123120 | | your_database_b | 0 | | your_database_c | 134113342 | | your_database_d | 4162406555 | +-----------------------+------------+ 4 rows in set |
td import:jar_update
後で使うのでjarファイルを最新にしておきます。
1 |
td import:jar_update |
DBとテーブルの作成
すでに作っている場合は不要ですが必要に応じて空箱を用意しておきます。
1 2 |
td table:create test_db td table:create test_db test_tb |
この際通常のDWHでは先にカラムの名称や型を定義しておくことが多いですが、TDの場合この時点では特に不要です。created.となるので次に進みます。
CSVをimportする – オプションの確認
どのプラットフォームでもimportするとき、いくつかのオプションを引数として設定するかと思います。ヘルプコマンドで確認してみましょう。
1 2 |
td help import:auto td help import:prepare |
import:autoの場合↓↓ ※見づらいので展開してください※
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
usage: $ td import:auto <session name> <files...> example: $ td import:auto mysess parts/* --parallel 4 $ td import:auto mysess parts/*.csv --format csv --columns time,uid,price,count --time-column time -o parts/ $ td import:auto parts/*.csv --auto-create mydb.mytbl --format csv --columns time,uid,price,count --time-column time -o parts/ $ td import:auto mysess mytable --format mysql --db-url jdbc:mysql://localhost/mydb --db-user myuser --db-password mypass $ td import:auto "s3://<s3_access_key>:<s3_secret_key>@/my_bucket/path/to/*.csv" --format csv --column-header --time-column date_time -o parts/ description: Automatically upload or re-upload files into a bulk import session. It's functional equivalent of 'upload' command with 'auto-perform', 'auto-commit' and 'auto-delete' options. But it, by default, doesn't provide 'auto-create' option. If you want 'auto-create' option, you explicitly must declare it as command options. options: --retry-count NUM upload process will automatically retry at specified time; default: 10 --auto-create DATABASE.TABLE create automatically bulk import session by specified database and table names If you use 'auto-create' option, you MUST not specify any session name as first argument. --parallel NUM upload in parallel (default: 2; max 8) -f, --format FORMAT source file format [csv, tsv, json, msgpack, apache, regex, mysql]; default=csv -C, --compress TYPE compressed type [gzip, none, auto]; default=auto detect -T, --time-format FORMAT specifies the strftime format of the time column The format slightly differs from Ruby's Time#strftime format in that the '%:z' and '%::z' timezone options are not supported. -e, --encoding TYPE encoding type [UTF-8, etc.] -o, --output DIR output directory. default directory is 'out'. -s, --split-size SIZE_IN_KB size of each parts (default: 16384) -t, --time-column NAME name of the time column --time-value TIME,HOURS time column's value. If the data doesn't have a time column, users can auto-generate the time column's value in 2 ways: * Fixed time value with --time-value TIME: where TIME is a Unix time in seconds since Epoch. The time column value is constant and equal to TIME seconds. E.g. '--time-value 1394409600' assigns the equivalent of timestamp 2014-03-10T00:00:00 to all records imported. * Incremental time value with --time-value TIME,HOURS: where TIME is the Unix time in seconds since Epoch and HOURS is the maximum range of the timestamps in hours. This mode can be used to assign incremental timestamps to subsequent records. Timestamps will be incremented by 1 second each record. If the number of records causes the timestamp to overflow the range (timestamp >= TIME + HOURS * 3600), the next timestamp will restart at TIME and continue from there. E.g. '--time-value 1394409600,10' will assign timestamp 1394409600 to the first record, timestamp 1394409601 to the second, 1394409602 to the third, and so on until the 36000th record which will have timestamp 1394445600 (1394409600 + 10 * 3600). The timestamp assigned to the 36001th record will be 1394409600 again and the timestamp will restart from there. --primary-key NAME:TYPE pair of name and type of primary key declared in your item table --prepare-parallel NUM prepare in parallel (default: 2; max 96) --only-columns NAME,NAME,... only columns --exclude-columns NAME,NAME,... exclude columns --error-records-handling MODE error records handling mode [skip, abort]; default=skip --invalid-columns-handling MODE invalid columns handling mode [autofix, warn]; default=warn --error-records-output DIR write error records; default directory is 'error-records'. --columns NAME,NAME,... column names (use --column-header instead if the first line has column names) --column-types TYPE,TYPE,... column types [string, int, long, double] --column-type NAME:TYPE column type [string, int, long, double]. A pair of column name and type can be specified like 'age:int' -S, --all-string disable automatic type conversion --empty-as-null-if-numeric the empty string values are interpreted as null values if columns are numerical types. CSV/TSV specific options: --column-header first line includes column names --delimiter CHAR delimiter CHAR; default="," at csv, "\t" at tsv --escape CHAR escape CHAR; default=\ --newline TYPE newline [CRLF, LF, CR]; default=CRLF --quote CHAR quote [DOUBLE, SINGLE, NONE]; if csv format, default=DOUBLE. if tsv format, default=NONE MySQL specific options: --db-url URL JDBC connection URL --db-user NAME user name for MySQL account --db-password PASSWORD password for MySQL account REGEX specific options: --regex-pattern PATTERN pattern to parse line. When 'regex' is used as source file format, this option is required |
import:prepareの場合↓↓ ※見づらいので展開してください※
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
usage: $ td import:prepare <files...> example: $ td import:prepare logs/*.csv --format csv --columns time,uid,price,count --time-column time -o parts/ $ td import:prepare logs/*.csv --format csv --columns date_code,uid,price,count --time-value 1394409600,10 -o parts/ $ td import:prepare mytable --format mysql --db-url jdbc:mysql://localhost/mydb --db-user myuser --db-password mypass $ td import:prepare "s3://<s3_access_key>:<s3_secret_key>@/my_bucket/path/to/*.csv" --format csv --column-header --time-column date_time -o parts/ description: Convert files into part file format options: -f, --format FORMAT source file format [csv, tsv, json, msgpack, apache, regex, mysql]; default=csv -C, --compress TYPE compressed type [gzip, none, auto]; default=auto detect -T, --time-format FORMAT specifies the strftime format of the time column The format slightly differs from Ruby's Time#strftime format in that the '%:z' and '%::z' timezone options are not supported. -e, --encoding TYPE encoding type [UTF-8, etc.] -o, --output DIR output directory. default directory is 'out'. -s, --split-size SIZE_IN_KB size of each parts (default: 16384) -t, --time-column NAME name of the time column --time-value TIME,HOURS time column's value. If the data doesn't have a time column, users can auto-generate the time column's value in 2 ways: * Fixed time value with --time-value TIME: where TIME is a Unix time in seconds since Epoch. The time column value is constant and equal to TIME seconds. E.g. '--time-value 1394409600' assigns the equivalent of timestamp 2014-03-10T00:00:00 to all records imported. * Incremental time value with --time-value TIME,HOURS: where TIME is the Unix time in seconds since Epoch and HOURS is the maximum range of the timestamps in hours. This mode can be used to assign incremental timestamps to subsequent records. Timestamps will be incremented by 1 second each record. If the number of records causes the timestamp to overflow the range (timestamp >= TIME + HOURS * 3600), the next timestamp will restart at TIME and continue from there. E.g. '--time-value 1394409600,10' will assign timestamp 1394409600 to the first record, timestamp 1394409601 to the second, 1394409602 to the third, and so on until the 36000th record which will have timestamp 1394445600 (1394409600 + 10 * 3600). The timestamp assigned to the 36001th record will be 1394409600 again and the timestamp will restart from there. --primary-key NAME:TYPE pair of name and type of primary key declared in your item table --prepare-parallel NUM prepare in parallel (default: 2; max 96) --only-columns NAME,NAME,... only columns --exclude-columns NAME,NAME,... exclude columns --error-records-handling MODE error records handling mode [skip, abort]; default=skip --invalid-columns-handling MODE invalid columns handling mode [autofix, warn]; default=warn --error-records-output DIR write error records; default directory is 'error-records'. --columns NAME,NAME,... column names (use --column-header instead if the first line has column names) --column-types TYPE,TYPE,... column types [string, int, long, double] --column-type NAME:TYPE column type [string, int, long, double]. A pair of column name and type can be specified like 'age:int' -S, --all-string disable automatic type conversion --empty-as-null-if-numeric the empty string values are interpreted as null values if columns are numerical types. CSV/TSV specific options: --column-header first line includes column names --delimiter CHAR delimiter CHAR; default="," at csv, "\t" at tsv --escape CHAR escape CHAR; default=\ --newline TYPE newline [CRLF, LF, CR]; default=CRLF --quote CHAR quote [DOUBLE, SINGLE, NONE]; if csv format, default=DOUBLE. if tsv format, default=NONE MySQL specific options: --db-url URL JDBC connection URL --db-user NAME user name for MySQL account --db-password PASSWORD password for MySQL account REGEX specific options: --regex-pattern PATTERN pattern to parse line. When 'regex' is used as source file format, this option is required |
import:prepareでimportする
ではローカルマシン上にある複数CSVをアップロードします。TSVの場合は引数のオプションにその旨を記載するだけです。
step1:セッションの作成
どのDBのどのTBにimportするよ。というのを任意のセッション名(t_session)を付けて定義しておきます。変数的なものだと考れば良いと思います。
1 |
td import:create t_session your_db_name test_tb |
step2:データの準備
CSVそのままのデータではTDが「よろしく」してくれないのでローカル上で変換をかませる必要があるとのこと。以下を実行するとmsgpack.gzというものが同じディレクトリ上に作成される。
尚トレジャーの仕様上timeカラムがないとimport出来ないのでprepare時に作成しておけばよい。
1 |
td import:prepare *.csv --format csv --column-header --encoding utf-8 --all-string --time-value 1589382000,10 |
–time-value の指定方法はいくつかありますが、今回は第一引数にUnixタイムスタンプを直接して指定しました。変換ジェネレーターなどを使うか。Windowsの場合はpowershellで、UNIXTimeの開始日から現在時刻を引いてあげればよい。
また、データ型の指定は–column-types オプションで一つづつ指定できる。
今回はめんどくさいので –all-string で全部文字列で投入して、必要に応じてSQLでCASTする方式に
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
$UNIX_EPOCH = Get-Date("1970/1/1 0:0:0 GMT") (Get-Date) - $UNIX_EPOCH Days : 18397 Hours : 22 Minutes : 9 Seconds : 30 Milliseconds : 610 Ticks : 15895805706103588 TotalDays : 18397.9232709532 TotalHours : 441550.158502877 TotalMinutes : 26493009.5101726 TotalSeconds : 1589580570.61036 TotalMilliseconds : 1589580570610.36 |
※TotalSecondsが現在のUNIXTime
第二引数はパーティションしたい数を指定します。一体幾つが最適なのだろう・・・という人は是非検証をお願いします。
step3:upload
その名の通りuploadする。送り先はstep1で定義したsession名を指定。
1 |
td import:upload t_session 'out\samplecsv.msgpack.gz' |
step4:import
uploadしたらimportです。
1 |
td import:perform t_session |
この後コミットしますが、running中なので、すべて完了してからコミットする。
進行状況はtd job:showコマンドで確認できるが、オプションの引数に以下の頭文字を入れると詳しく教えてくれる。
1 2 3 4 5 6 7 8 9 10 11 |
options: -v, --verbose show logs -w, --wait wait for finishing the job -G, --vertical use vertical table to show results -o, --output PATH write result to the file -l, --limit ROWS limit the number of result rows shown when not outputting to file -c, --column-header output of the columns' header when the schema is available for the table (only applies to tsv and csv formats) -x, --exclude do not automatically retrieve the job result --null STRING null expression in csv or tsv -f, --format FORMAT format of the result to write to the file (tsv, csv, json, msgpack, and msgpack.gz) |
1 |
job:show job_id |
1 2 3 4 5 6 7 |
td job:show 123456789 JobID : 123456789 Status : success Type : bulk_import_perform Database : XXXXXXX Destination : -- LOAD DATA SESSION XXXXXXXXXXXXXXXXXXXXXXXXXXXX Use '-v' option to show detailed messages. |
Statusがsuccessに変わったらコミットする。
step5:commit
最後にコミット
1 |
td import:commit t_session |
以上で指定したテーブルにCSVファイルが作成されているはずです。
ターミナル上でもテーブルの確認は行えますが、あまりイケてないので、WEBコンソールかSQLクライアント等でlimit句あてて希望の形にimportが行えているか確認してみてください。
以上です。