GCSからBigQueryへCSVデータが入ったタイミングで自動で転送する(Cloud Functions)

目的

Cloud StorageにCSVデータが入ったら、データ処理して、自動でBigQueryにデータが流れるようにしたい

使用するツールと構図

Cloud Functionsを使います。Cloud Functionsについて知らない方は以下を参照してください。

Cloud SchedulerとCloud Functionsを使って自動スクレイピングさせて、スプレッドシートに書き出してみた(Python)

Google先生で調べたら同じようなことをやろうとしている方も多く、たくさんの記事がありました。

こちらの記事を参考にさせていただきました。https://nishipy.com/archives/765

作成の流れは以下です。

  1. Cloud Storageにバケットを作成する
  2. BigQueryにデータセットとテーブルを用意する
  3. Pythonファイルを作成する
  4. Cloud Functionsにデプロイする
  5. テストする

開発環境にCloud Shellを使って、すべてコマンドで処理します。

1.Cloud Storageにバケットを作成する

Cloud Shellから以下のコマンドを叩いて、Cloud Storageにバケットを作成します。バケット名は名前を変更して作成してください。

gsutil mb gs://gcs-gcf-load-1124

2.BigQueryにデータセットとテーブルを用意する

以下のコマンドでBigQueryにデータセットを作成します。

bq mk --dataset gcs_gcf

テーブル作成の前にschema.jsonファイルを作成してください。

中身は以下になります。

[
  {
    "mode": "NULLABLE",
    "name": "date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "channel_grouping",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "medium",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "sessions",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "pageviews",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "bounces",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "session_duration",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "exits",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "users",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "new_users",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "goal1_completions",
    "type": "INTEGER"
  }
]

以下コマンドで空のテーブルを作成します。

bq mk --table --schema=schema.json --time_partitioning_field=date gcs_gcf.import_data

3.Pythonファイルを作成する

今回はPythonを使って作成していきます。

Cloud Shellでフォルダを作成し、main.pyとrequirements.txtファイルを用意します。

mkdir gcs-gcf
cd gcs-gcf

main.pyのファイルの中身は以下になります。

from google.cloud import bigquery

def load_data(data, context):
    client = bigquery.Client()
    project_id = 'プロジェクトIDをいれてください'
    dataset_id = 'gcs_gcf'

    bucket_name = data['bucket']
    file_name = data['name']
    file_ext = file_name.split('.')[-1]

    if file_ext == 'csv':
        uri = 'gs://' + bucket_name + '/' + file_name
        table_id = 'import_data'
        dataset_ref = client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)

        #load config
        job_config = bigquery.LoadJobConfig()
        job_config.skip_leading_rows = 1
        job_config.source_format = bigquery.SourceFormat.CSV

        #load data
        load_job = client.load_table_from_uri(
            uri,
            table_ref,
            job_config=job_config
        )
        print('Started job {}'.format(load_job.job_id))
        load_job.result()
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(table_id))
        print('Loaded {} rows.'.format(destination_table.num_rows))

    else:
        print('Nothing To Do')

スクリプトを簡単に説明すると、load_data関数を作成し、引数にdataとcontextを設定します。dataにstorageの情報がはいってきます。google cloud bigqueryのパッケージを読み取り、変数clientにbigquery情報を入れ込みます。functionsをデプロイするときに指定したバケットにcsvデータが入った場合のみcsvデータの1行目をスキップ、formatをCSVでbigQueryの対象テーブルに追記されるようになります。(データの追加方法は、「追記」がデフォルトになります。job_config.write_disposition = ‘WRITE_APPEND’)

続いて、外部パッケージを認識させるためにrequierments.txtに以下のコードを追記します。

google-cloud-bigquery == 2.20.0

以下のURLにGCPのPythonパッケージリストがあります。なるべく最新のバージョンをとってくることを推奨します。

https://cloud.google.com/python/references/libraries?hl=ja

4.Cloud Functionsにデプロイする

以下コマンドでCloud Functionsにデプロイします。

gcloud functions deploy load_data --runtime python37 --trigger-resource バケット名をいれてください --trigger-event google.storage.object.finalize

trigger-resourceで対象のバケット名を指定し、trigger-eventでstorage.object.finalizeを指定してstorageにファイルが入ってきた時または更新されたときという設定でデプロイします。

またCloud Storageのトリガーには作成の他に、削除、アーカイブ、メタデータ更新でもイベント設定ができます。

詳しくは以下を参照してください。

https://cloud.google.com/functions/docs/calling/storage?hl=ja

デプロイ完了には数分かかります。

以下のような表示で終わるとデプロイ完了です。

~
status: ACTIVE
timeout: 60s
updateTime: '2021-07-07T16:27:56.446Z'
versionId: '1'

5.テストする

以上で設定完了です。最後に問題なく動くかテストしてみましょう。

以下のcsvファイルをダウンロードして、Cloud Storageにアップロードしてみましょう。

Cloud Functionsのログからエラーがでていないか確認してみましょう。

上記画像のようにアイコンが青であれば、エラーなく実行できております。

このようにびっくりマークアイコンがでていたらエラーになっています。ログを確認して修正し、再度デプロイしてみてください。

最後にBigQueryにデータがはいっているか確認しましょう。

問題なくはいってました。

まとめ

GCSからBigQueryへ自動で取り込む方法について、Cloud Functionsを使い、Pythonで実装してみました。

これができるようになると、例えば営業の人がまとめているデータをCloud Storageに好きなときにアップロードしてもらうだけで、自動でデータをきれいにして、BigQueryにデータが集まる仕組みを作ることができます。

毎回営業からcsvデータもらって整形して、csvをBigQueryにあげるというのを手作業でやっていたらきりがないですよね。

ぜひみなさんも作ってみてください。

この記事で使用したスクリプト

https://github.com/takuma11248250/gcs-gcf