snowflakeのデータロード
snowflakeでデータをまとめて入れる場合、そのデータはまずファイルに出力する必要がある。
その後、ファイルをクラウドストレージにステージングし、
最後に、COPY INTO
コマンドを用いて、クラウドストレージからsnowflakeにロードする。
コマンドの流れは以下の通り
CREATE STAGE
CREATE FILE FORMAT
CREATE TABLE
COPY INTO TABLE
ファイルのフォーマットについて
ロード/アンロード中にファイルを解析するために、ファイルフォーマットを設定する必要がある。
フォーマットは、COPY INTO
コマンドだけでなく、CREATE FILE FORMAT
コマンドでオブジェクトとして作成しておくこともできる。
例えば - CSV形式で - ,区切りを指定して - 1行スキップする
という条件のファイルを読み込みたい場合。
CREATE FILE FORMAT DEMO_FF TYPR = 'CSV' FILELD_DELIMITER = ',' SKIP_HEADER = 1;
フォーマットは、CSVの他にも、JSONフォーマットなどもある。
ステージについて
ストレージとは、バケットのようなファイル置き場のことである。
COPY INTO
コマンドでステージに突っ込むことが可能。
ステージには3つほど種類がある。
- テーブルステージ
@%テーブル名
- テーブルに紐づいているステージである。
%
の後にテーブル名を指定することで、そのテーブル名のステージにファイルを置くことができる。
- ユーザーステージ
@~
- これはユーザーに紐ずくステージである。
- (KKING)ユーザーが存在すれば、KIINGステージも存在する
- 他人のステージを指定することはできない。
これらは自動で作成されるステージでである。snowflake内部に存在するステージであり、ファイルフォーマットの設定をサポートしていない。
- ネームドステージ
一括ロードの概要
一括ロードとは、バッチ処理のこと
from https://docs.snowflake.com/ja/user-guide/data-pipelines-intro.html
手順としては以下の通り
- ローカルにあるファイルからsnowflakeへ
PUT
コマンドで、内部ストレージへ移動する。 - 内部ステージから、COPY INTOする。
- または、外部ステージ、クラウドストレージからCOPY INTOする
これらの処理を、バッチ処理として行う
注意点としては、snowflakeのweb UIからは実行できないという点。
ローカルストレージからバッチ処理を行う
名前つき内部ステージを作成。
CREATE STAGE my_stage FILER_FOERMAT = my_csv_format;
データをステージにPUT。
PUT FILE:///data/data.csv @my_stage
データをテーブルにコピー。
COPY INTO my_table FROM @my_stage
クラウドストレージからバッチ処理を行う
- 外部ステージを作成
create stage my_s3_stage storage_integration = s3_int url = 's3://mybucket/encrypted_files/' file_format = my_csv_format;
- COPYを使用してデータをロードする
COPY INTO my_table FROM @my_s3_stage PATTERN='.sales.*.csv'
COPY INTOコマンドの代表的なオプション
FILES
[ FILES = ( '<file_name>' [ , '<file_name>' ] [ , ... ] ) ]
ファイルをカンマ区切りで指定するオプション
PATTERN
[ PATTERN = '<regex_pattern>' ]
ファイルを正規表現で指定するオプション
FORCE
FORCE = TRUE | FALSE
以前にロードされたかどうか、ロード後に変更があったかどうかに関係なく、すべてのファイルをロードするよう指定するオプション
PURGE
PURGE = TRUE | FALSE
データが正常にロードされた後、ステージからデータファイルを自動的に削除するかどうかを指定するブール値。
COPY INTOで変換しながらコピーする
COPY INTOコマンドは、SLECT文を用いた変換が可能である。 結合やフィルターや集計は行い得ないが、ステージ上でできる範囲でできる。
COPY INOT home_sales (city, zip, sale_data, price) FROM ( SELECT SUBSTR(t,$2,4), t.&1, t.$5, t.$4 FROM $my_stage t ) FILR_FOERMAT = (FORMAT_NAME = format_csv)
COPY INTOのON__ERROEオプション
データロード次のエラーの処理方法を制御することが可能
ON_ERROE=CONTINUE
: エラーが見つかった場合は、ファイルのロードを続行します。エラーが見つかった行はロードされません。ON_ERROE=SKIP_FILE
: エラーが見つかった場合はファイルをスキップします。
COPY INTOの過去の実行で、ロード中に発生したエラーを確認する
ON_ERROE=CONTINUE
で全てのロードが完了した後、どの行がエラーが発生したのかを確認することができる。
select * from table(VALIDATE(mytable, job => '<query_id>'))
VALIDATION_MODEを使用して、検証モードでコピーする
COPY INTOした際のシミレーションが可能。
COPY INTO my_table FROM @my_stage/mylife.csv.gz VALIFATION_MODE=return_all_errors;
ただし、SELECT文での変換を検証することはできない。
snowflakeのストリーム設定方法
snowflakeにはsnowpipe
という機能が存在する。
これは一括でロードを行うCOPY
コマンドと異なり、継続的なストリームとしてデータをファイルから読み込むことが可能である。
- ステージにファイルをのっける(S3など)(ファイルを乗っける方法はそれぞれのアプリケーションで行う)
- ステージにファイルが上がると、
インジェスションキュー
にデータが入り、 - 順次、Snowpipeがインジェスションキューにあるデータをテーブルに導入する
REST APIを使用する方法
pythonなどを使用したプログラムからのストリーミングには、REST APIを使用することをお勧めする。 流れとしては以下の通り。
- RESTを呼び出し、ファイル名を指定する。
- (RESTの呼び出し方はプログラムなどで、pythonなどから呼び出せる。)
- 以下のコードで作成したSnowpipeがデータをテーブルにロードしてくれる
CREATE PIPE IF NOT EXSTS mypipe AS COPY INTO mytable FROM @mystage;
外部ステージでも、内部ステージでも使えるのがメリット
AUTO-INGESTを使用する方法
外部ステージでしか使えない方法(クラウドベンダーを使用する方法なので)
- Apache kafkaを使用して、外部ストレージにファイルをアップロードする。or FIREHOUSEなどを使う
- 外部ステージから、通知を飛ばす(飛ばす方法は、AWSなどのSQSなど)
- 次のコマンドで作成したSnowpipeがその通知を検知して、snowflake pipeがテーブルを入れる。
CREATE PIPE.public.mypipe AUTO_INGEST=TRUE AS COPY INTO mytable FROM @mystage;
snowpipeのロード順について
通常はファイルを入れた順に処理が開始される。 ただし、サイズの大きいファイルを先に入れたとしても、後から入れた小さいファイルが先に処理が完了してしまう可能性がある
snowpipeの推奨事項
ステージファイルは1分に一回を推奨される
ファイルを管理するためのオーバーヘッドは、キューに入れられたファイルの数に関連して増加する。
ファイルを入れれば入れるほど、大量の時間がかかってしまう。
snowpipeの請求について
サーバーレスモデルについては、仮想ウェアハウスは必要ない。 snowflakeがコンピュートリソースを提供及び管理する。
アカウントは実際のコンピュート使用料に基づいて課金される。 コアごとに課金されるので、ウェアハウスの一時停止について心配する必要などはない。
参考:REST呼び出しまたは自動取り込みを介して通知された1000ファイルあたり0.06クレジットの使用コスト
尾行
title:snowflakeのバッチ処理とストリーム処理【snowflake解説】
category_script:page_name.startswith("7")
img:https://paya02.com/wp-content/uploads/2018/09/icon25620991.png