Dataflowを使ってリアルタイム分析基盤を構築する(Python)

この記事は以下の記事の続編になります。

【さわってみよう!】Google Cloud Dataflowって何に使うの?

また、以下の書籍の第10章を参考にまとめております。

目的

  • Dataflowの構造について理解する
  • Python(Apache Beam)を使ってリアルタイムデータ分析基盤を構築する

この記事で説明しないこと

  • Python(Apache Beam)コードの具体的な書き方や工夫した方がいいこと

Dataflowは「Apache Beam」というフレームワークを使ってコードを書いていくのですが、細かい説明は省きます。細かい記載の仕方はこちらの記事を参照ください。

https://qiita.com/hayatoy/items/987658490a69c7d24635

Dataflowの構造について

使用するフレームワークについて

Google Cloud Dataflowは「Apache Beam」というオープンソースのデータ処理フレームワークを使って処理を行っています。Apache Beamというのは、バッチ処理とストリーミング処理を同じようなコードで実行でき、BeamはBatchとStreamingの合わせた言葉だそうです。

Apache Beamは、Python、Java、GOを使うことができ、Pythonをよく使う人が多い?印象です。

パイプラインについて

Dataflowではデータを読み込んで吐き出すまでの一連の流れを「パイプライン」とよびます。

パイプラインの中に「PCollection」「Transform」があります。

イメージにすると以下のような感じです。

PCollectionというのは、データそのもののことを指します。

Transformというのは、データを処理することを指します。

要するに、データ(PCollection)を処理(Transform)して、別のデータ(PCollection)を作成する。完成したデータ(PCollection)をBigQueryなどのデータベースに保存する。という意味になります。

この一連の流れを冒頭でもお話したとおり「パイプライン」と呼びます。

コードで書くときも同じようにデータ読み込み→PCollection→Transform→データ書き込みの順番で進んでいくので、可読性の高いコードで実装することができます。

構築の流れ

今回使用するデータは、公開データの「ニューヨークのタクシー位置情報」を使用していきます。開発環境は、Cloud Shellを使用します。流れは以下です。

  1. Pub/Subでサブスクリプション作成
  2. BigQueryでデータセットとテーブルの作成
  3. Pythonファイルの確認(nyc_taxi_streaming_analytics1.py)
  4. Dataflowの実行、BigQueryデータ確認
  5. BigQueryに別のテーブルを作成
  6. Pythonファイルの確認(nyc_taxi_streaming_analytics2.py)
  7. Dataflowの実行、BigQueryデータ確認
  8. DataPortalで可視化
  9. 環境のリセット

0.git clone

今回使うファイルがあるgitをcloneしておきます。

https://github.com/ghmagazine/gcpdataplatformbook

1.Pub/Subでサブスクリプション作成

公開データの「ニューヨークのタクシー位置情報」をPULLする設定をします。

以下のコードで設定します。

gcloud pubsub subscriptions create streaming-taxi-rides --topic=projects/pubsub-public-data/topics/taxirides-realtime

以下のコマンドでデータがとってこれるようになっているか確認してください。(取得できるようになるまで数分かかります)

gcloud pubsub subscriptions pull projects/$(gcloud config get-value project)/subscriptions/streaming-taxi-rides --format="value(message.data)" | jq

このようなjsonの形でデータが取得できるようになっています。このデータをBigQueryにいれるためにDataflowで前処理してあげて、構造化データにしていきます。

2.BigQueryでデータセットとテーブルの作成

以下コードでデータセットとテーブルを作成していきます。

bq mk --dataset $(gcloud config get-value project):nyc_taxi_trip
bq mk --time_partitioning_type=HOUR --time_partitioning_field=timestamp nyc_taxi_trip.rides_1m "timestamp:timestamp, ride_status:string, count:integer"

3.Pythonファイルの確認(nyc_taxi_streaming_analytics1.py)

cloneしたフォルダの中身をcloud shellから確認しましょう。

今回使うファイルは、フォルダch10の中の「nyc_taxi_streaming_analytics1.py」と「nyc_taxi_streaming_analytics2.py」です。

フォルダを作って、作ったフォルダの中に移動させておきましょう。

mkdir dataflow-streaming-taxi-rides
mv nyc_taxi_streaming_analytics1.py dataflow-streaming-taxi-rides/
mv nyc_taxi_streaming_analytics2.py dataflow-streaming-taxi-rides/
cd dataflow-streaming-taxi-rides

わたしはgitがはいってますが、はいってなくても大丈夫です。

まずは「nyc_taxi_streaming_analytics1.py」から中身を確認していきます。

コード

# -*- coding: utf-8 -*-

# 次のデータパイプライン処理を行うDataflowジョブのコード、nyc_taxi_streaming2.py
# 1. Pub/Subからタクシーの位置情報をストリーミングに取得
# 2. タンブリングウィンドウで5分ごとの乗車数と降車数を
#    BigQueryのテーブルにデータ追加

import argparse
import logging
import json
from datetime import datetime

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


# ウィンドウの開始イベント時間を取得してPCollectionの要素に付加する処理を備えたクラス
class AttachWindowTimestamp(beam.DoFn):
    # PCollectionの各要素へ実施する処理
    # window=beam.DoFn.WindowParam を加えることで、ウィンドウに関する情報が取得できる
    def process(self, element, window=beam.DoFn.WindowParam):
        (status, count) = element
        # 当該ウィンドウの開始イベント時間を取得
        window_start_dt = window.start.to_utc_datetime()

        # PCollectionに含まれた要素にウィンドウのイベント開始時間を足して返却
        status_count = {"timestamp": window_start_dt.strftime('%Y-%m-%d %H:%M:%S'),
                        "ride_status": status,
                        "count": count}
        yield status_count


# ジョブ実行時のメイン処理部分
def run(argv=None, save_main_session=True):

    # 実行時のコマンドで受け付けるオプションの設定
    parser = argparse.ArgumentParser()
    # 入力のサブスクリプション受付のためのオプション
    parser.add_argument(
      '--input_subscription',
      required=True,
      help=(
          'Input PubSub subscription '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
    # 出力のBigQueryデータセット受付のためのオプション
    parser.add_argument(
      '--output_dataset',
      required=True,
      help=(
          'Output BigQuery dataset '
          '"<PROJECT>.<DATASET>"'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    # パイプラインに渡すオプションインスタンスを生成します。
    # streaming=True でストリーミングジョブを有効にするオプションを明示的に渡します。
    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # パイプラインの生成
    with beam.Pipeline(options=pipeline_options) as p:

        subscription = known_args.input_subscription
        (bigquery_project, dataset) = known_args.output_dataset.split('.')

        rides = (
            p
            # ReadFromPubSub()で指定されたPub/Sub スブスクリプションからメッセージを取得
            | 'Read From PubSub' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
            # メッセージ文字列をPythonのディクショナリに変換します。
            | 'ToDict' >> beam.Map(json.loads)
        )

        # PCollectionの要素が乗車および降車のデータのみ返却する関数
        def is_pickup_or_dropoff(element):
            return element['ride_status'] in ('pickup', 'dropoff')

        rides_onoff = (
            rides
            # 乗降車データのみ抽出。走行中 enroute データを除外
            | 'Filter pickup/dropoff' >> beam.Filter(is_pickup_or_dropoff)
        )

        rides_onoff_1m = (
            rides_onoff
            # タンブリングウィンドウ生成
            | 'Into 1m FixedWindow' >> beam.WindowInto(window.FixedWindows(60))
            # 乗車ステータスごとに件数を集計
            | 'Group status by rides' >> beam.Map(lambda x: (x['ride_status'],1))
            | 'Count unique elements' >> beam.combiners.Count.PerKey()
            # ウィンドウの開始イベント時刻をデータに付与
            | 'Attach window start timestamp' >> beam.ParDo(AttachWindowTimestamp())
        )

        # WriteToBigQueryを使って、BigQueryへストリーミング挿入で結果を出力
        rides_onoff_1m | 'Write 1m rides to BigQuery' >> WriteToBigQuery('rides_1m',
                                                            dataset=dataset,
                                                            project=bigquery_project,
                                                            create_disposition=BigQueryDisposition.CREATE_NEVER)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

具体的な説明は書籍で記載あるので省きますが、関数runを実行させて、実行の際に引数としてinput=Pub/Subのサブスクリプション、output=データセットを指定します。

流れとしては、

  1. PCollection「rides」を作成(Pub/Subサブスクリプション(jsonデータを読み込む))
  2. PCollection「rides」を変換(Transform)し、PCollection「rides_onoff」を作成
  3. PCollection「rides_onoff」を変換(Transform)し、PCollection「rides_onoff_1m」を作成
  4. PCollection「rides_onoff」をBigQueryに書き込む

このような流れです。冒頭で説明したパイプラインの流れ通りにコードも記載していけるので非常に可動性が高いです。

4.Dataflowの実行、BigQueryデータ確認

それでは以下コードを実行して、Dataflowを起動します。

python3 nyc_taxi_streaming_analytics1.py --project $(gcloud config get-value project) --job_name=taxirides-realtime --region='us-central1' --runner DataflowRunner --input_subscription "projects/$(gcloud config get-value project)/subscriptions/streaming-taxi-rides" --output_dataset "$(gcloud config get-value project).nyc_taxi_trip"

コンソールのジョブが正常に動き出したらOKです。

数分後、BigQueryのデータセットにデータがどんどんはいってきます。

では、もう一つやってみます。

5.BigQueryに別のテーブルを作成

以下コマンドでBigQueryにテーブルを作成します。

bq mk nyc_taxi_trip.trips_od "ride_id:string, pickup_datetime:datetime, dropoff_datetime:datetime, pickup_location:geography, dropoff_location:geography, meter_reading:float, time_sec:integer, passenger_count:integer"

6.Pythonファイルの確認(nyc_taxi_streaming_analytics2.py)

コード

# -*- coding: utf-8 -*-

# 次のデータパイプライン処理を行うDataflowジョブのコード、nyc_taxi_streaming2.py
# 1. Pub/Subからタクシーの位置情報をストリーミングに取得
# 2. タンブリングウィンドウで5分ごとの乗車数と降車数を
#    BigQueryのテーブルにデータ追加
# 3. セッションウィンドウで乗車ごとに乗降車データを一つの
#    レコードにまとめてBigQueryのテーブルにデータ追加

import argparse
import logging
import json
from datetime import datetime

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


# ウィンドウの開始イベント時間を取得してPCollectionの要素に付加する処理を備えたクラス
class AttachWindowTimestamp(beam.DoFn):
    # PCollectionの各要素へ実施する処理
    # window=beam.DoFn.WindowParam を加えることで、ウィンドウに関する情報が取得できる
    def process(self, element, window=beam.DoFn.WindowParam):
        (status, count) = element
        # 当該ウィンドウの開始イベント時間を取得
        window_start_dt = window.start.to_utc_datetime()

        # PCollectionに含まれた要素にウィンドウのイベント開始時間を足して返却
        status_count = {"timestamp": window_start_dt.strftime('%Y-%m-%d %H:%M:%S'),
                        "ride_status": status,
                        "count": count}
        yield status_count

# 乗車と降車のデータを整形して一つのレコードに乗降者データを格納する処理を備えたクラス
class CompileTripOD(beam.DoFn):
    # PCollectionの各要素へ実施する処理
    def process(self, element):
        (ride_id, rides) = element

        # 最終的につくるレコードを初期化
        trip_od = {"ride_id": ride_id,
                   "pickup_datetime": None,
                   "pickup_location": None,
                   "dropoff_datetime": None,
                   "dropoff_location": None,
                   "meter_reading": None,
                   "time_sec": None,
                   "passenger_count": None}

        pickup_dt = None
        dropoff_dt = None
        # 乗車データと降車データを取り出してそれぞれの情報をtrip_odに反映
        for ride in rides:
            if ride['ride_status'] == 'pickup':
                trip_od['pickup_datetime'] = ride['timestamp'][0:19]
                trip_od['pickup_location'] = 'POINT({} {})'.format(ride['longitude'],ride['latitude'])
                trip_od['passenger_count'] = ride['passenger_count']
                pickup_dt = datetime.strptime(trip_od['pickup_datetime'], '%Y-%m-%dT%H:%M:%S')
            elif ride['ride_status'] == 'dropoff':
                trip_od['dropoff_datetime'] = ride['timestamp'][0:19]
                trip_od['dropoff_location'] = 'POINT({} {})'.format(ride['longitude'],ride['latitude'])
                trip_od['meter_reading'] = ride['meter_reading']
                dropoff_dt = datetime.strptime(trip_od['dropoff_datetime'], '%Y-%m-%dT%H:%M:%S')

        # 乗車と降車の両方が揃っていた場合、乗車時間を計算して追加
        if pickup_dt and dropoff_dt:
            trip_od['time_sec'] = (dropoff_dt - pickup_dt).total_seconds()

        yield trip_od


# ジョブ実行時のメイン処理部分
def run(argv=None, save_main_session=True):

    # 実行時のコマンドで受け付けるオプションの設定
    parser = argparse.ArgumentParser()
    # 入力のサブスクリプション受付のためのオプション
    parser.add_argument(
      '--input_subscription',
      required=True,
      help=(
          'Input PubSub subscription '
          '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>".'))
    # 出力のBigQueryデータセット受付のためのオプション
    parser.add_argument(
      '--output_dataset',
      required=True,
      help=(
          'Output BigQuery dataset '
          '"<PROJECT>.<DATASET>"'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    # パイプラインに渡すオプションインスタンスを生成します。
    # streaming=True でストリーミングジョブを有効にするオプションを明示的に渡します。
    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # パイプラインの生成
    with beam.Pipeline(options=pipeline_options) as p:

        subscription = known_args.input_subscription
        (bigquery_project, dataset) = known_args.output_dataset.split('.')
        #ridesというPCollectionを作成
        rides = (
            p
            # ReadFromPubSub()で指定されたPub/Sub スブスクリプションからメッセージを取得
            | 'Read From PubSub' >> ReadFromPubSub(subscription=subscription).with_output_types(bytes)
            # メッセージ文字列をPythonのディクショナリに変換します。
            | 'ToDict' >> beam.Map(json.loads)
        )

        # PCollectionの要素が乗車および降車のデータのみ返却する関数
        def is_pickup_or_dropoff(element):
            return element['ride_status'] in ('pickup', 'dropoff')
        #PCollection「rides」をTransformして「rides_onoff」というPCollectionを作成
        rides_onoff = (
            rides
            # 乗降車データのみ抽出。走行中 enroute データを除外
            | 'Filter pickup/dropoff' >> beam.Filter(is_pickup_or_dropoff)
        )
        #PCollection「rides_onoff」をTransformして「rides_onoff_1m」というPCollectionを作成
        rides_onoff_1m = (
            rides_onoff
            # タンブリングウィンドウ生成
            | 'Into 1m FixedWindow' >> beam.WindowInto(window.FixedWindows(60))
            # 乗車ステータスごとに件数を集計
            | 'Group status by rides' >> beam.Map(lambda x: (x['ride_status'],1))
            | 'Count unique elements' >> beam.combiners.Count.PerKey()
            # ウィンドウの開始イベント時刻をデータに付与
            | 'Attach window start timestamp' >> beam.ParDo(AttachWindowTimestamp())
        )

        #PCollection「rides_onoff_1m」をWriteする
        # WriteToBigQueryを使って、BigQueryへストリーミング挿入で結果を出力
        rides_onoff_1m | 'Write 1m rides to BigQuery' >> WriteToBigQuery('rides_1m',
                                                            dataset=dataset,
                                                            project=bigquery_project,
                                                            create_disposition=BigQueryDisposition.CREATE_NEVER)

        #PCollection「rides_onoff」をTransformして「trips_od」というPCollectionを作成
        trips_od = (
            # 乗降車に絞ったデータPCollection
            rides_onoff
            # セッションウィンドウで利用するためのセッションIDとなるride_idをキーに設定
            | 'Key-value pair with Ride_id' >> beam.Map(lambda x: (x['ride_id'],x))
            # セッションウィンドウ設定。ギャップ期間を5分に設定。
            # もし同じ乗車データの位置情報が5分より大きな間隔をあけて到着した場合、
            # 別のセッションとして集計される
            | 'Into SessionWindows' >> beam.WindowInto(window.Sessions(5*60))
            | 'Group by ride_id' >> beam.GroupByKey()
            # セッション内でまとめた乗車および降車データを一つの要素に結合する
            # 処理は、CompileTripODクラスで実装
            | 'Compile trip OD' >> beam.ParDo(CompileTripOD())
        )
        #PCollection「trips_od」をWriteする
        trips_od | 'Write od trips to BigQuery' >> WriteToBigQuery('trips_od',
                                                                   dataset=dataset,
                                                                   project=bigquery_project,
                                                                   create_disposition=BigQueryDisposition.CREATE_NEVER)
#PCollection「ride_onoff」から2つに分岐して生成している
#「rides_onoff_1m」「trips_od」の2つのPCollectionをwriteしている
#このように一つのデータソースに対して複数の処理を条件分岐ができるため、コストの回避と管理の効率性をあげることができる

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

1と違うところは、複数のPCollectionを作り、複数の処理を同時に行なっているところです。

PCollection「ride_onoff」を「rides_onoff_1m」「trips_od」の2つに分岐させ、それぞれテーブルに書き込んでます。

このようにすることで一つのデータソースに対して複数の処理を条件分岐できるため、コストの削減や管理の効率性上昇が見込めます。

7.Dataflowの実行、BigQueryデータ確認

以下コマンドで実行してみましょう。

1で実行したコマンドに–updateを追加して、対象のファイルを変更しただけです。このように記載することで今動いているジョブを更新することができます。

python3 nyc_taxi_streaming_analytics2.py --update --project $(gcloud config get-value project) --job_name=taxirides-realtime --region='us-central1' --runner DataflowRunner --input_subscription "projects/$(gcloud config get-value project)/subscriptions/streaming-taxi-rides" --output_dataset "$(gcloud config get-value project).nyc_taxi_trip"

このように複数の処理を同時に実行させています。

数分すると先程作った「trips_od」テーブルにもデータが入ってきます。

8.DataPortalで可視化

最後にDataPortalで可視化してみましょう。

DataPortalは15分更新なので、本当はLookerとかを使うのがベストですが、契約してないので仕方なく、、、。

9.環境のリセット

こちらで作成した以下のツールをすべて削除しましょう。削除しないとお金がかかり続けてしまうので、注意してください。

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • BigQuery

まとめ

以上でリアルタイムデータ分析基盤の構築ができました。Dataflowは非常に使いやすく、簡単にパイプラインを構築し、データ処理を行えるツールです。

私もまだまだ使いきれてないですが、IOTデータや位置情報データなどリアルタイムでデータがほしいようなビジネスのデータ基盤をもっと作ってみたいなと勉強してて思いました。

まだまだバッチでいいってところ多いですからね。