Apache BeamでBigQueryに書き込む際エラーログを出力する

ZOZO Advent Calendar 2022 13日目の記事です。
今回はApache Beam v2.41辺りで導入されたBigQueryに書き込む際、エラーが発生した場合のエラーメッセージを出力できるようになったので紹介します。

経緯

Apache Beam v2.40以前はBigQueryに書き込みエラーが発生した場合、dictが返されます。FailedRowsキーにテーブル情報、書き込もうとしたデータがtupleとして格納されています。しかし、エラーログに関しては取得できません。

公式の方ではdeadletter patternとして紹介されています。 beam.apache.org

import apache_beam as beam

class TestPipeline(beam.DoFn):
  def pipeline(self):
      table_id = ""
      options = beam.options.pipeline_options.PipelineOptions()
      with beam.Pipeline(options=options) as p:
        # hogeカラムはint型なのでエラーになる
        elements = (p | beam.Create([{"hoge": "fuga"}]))

        errors = ( 
          elements
          | 'Write to table' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=table_id,
            # エラーが発生した際リトライは行わない
            insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER,
            )
        )
        # 書き込みに失敗した場合処理される
        ( errors["FailedRows"] | beam.Map(print) )
def run():
  test = TestPipeline()
  test.pipeline()

if __name__ == '__main__':
  run()

出力結果

# コンソール上にエラーログは出力される
ERROR:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will not retry. Errors were [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'hoge', 'debugInfo': '', 'message': 'Cannot convert value to integer (bad value): fuga'}]}]
# printした結果。FailedRows内にエラーログは含まれていない
('competitor-ec-crawler-dev:temp.sample2', {'hoge': 'fuga'})

エラーログを取得できるようにする

検証環境

以下のバージョンで実装を行います。
Python 3.9
apache-beam 2.34.0

実装

実装は下記になります。内容はあまり変わっておらず、FailedRowsキーを指定した部分がfailed_rows_with_errorsメソッドで呼び出しています。

import apache_beam as beam

class TestPipeline(beam.DoFn):
  def pipeline(self):
      table_id = ""
      options = beam.options.pipeline_options.PipelineOptions()
      with beam.Pipeline(options=options) as p:
        # hogeカラムはint型なのでエラーになる
        elements = (p | beam.Create([{"hoge": "fuga"}]))

        errors = ( 
          elements
          | 'Write to table' >> beam.io.gcp.bigquery.WriteToBigQuery(
            table=table_id,
            # エラーが発生した際リトライは行わない
            insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER,
            )
        )
        # 書き込みに失敗した場合処理される
        ( errors.failed_rows_with_errors | beam.Map(print) )
def run():
  test = TestPipeline()
  test.pipeline()

if __name__ == '__main__':
  run()

結果

WARNING:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will not retry. Errors were [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'hoge', 'debugInfo': '', 'message': 'Cannot convert value to integer (bad value): fuga'}]}]
# printした結果。エラーログが含まれている
('competitor-ec-crawler-dev:temp.sample2', {'hoge': 'fuga'}, [{'reason': 'invalid', 'location': 'hoge', 'debugInfo': '', 'message': 'Cannot convert value to integer (bad value): fuga'}])

まとめ

無事にエラーログをプログラム上で取得できました。エラーになったデータを別のテーブルに書き込む際、エラーログも書き込みたい場合便利です。 他にも利用できるメソッドがあるので詳しくは下記ドキュメントをご覧ください。 beam.apache.org