Cloud Pub/Sub Topicへメッセージを短時間で大量にPublishする

ZOZO Advent Calendar 2023 21日目の記事になります。

今回紹介する記事は、Cloud Pub/Sub Topicへメッセージを大量にPublishする方法になります。Cloud Pub/Subの背後に連携したリソースの負荷試験や、データのスループットを計測したい際、大量のメッセージを短時間でPublishしたい時があります。一つ一つのメッセージをPublishしてresultメソッドを呼び出してもメッセージのPublishが遅くなるのである程度まとめてPublishするようなロジックを作成します。

基本的な部分はGoogle Cloudの下記記事と同じようなロジックになります。

cloud.google.com

検証バージョン

Python 3.12
google-cloud-pubsub 2.19.0

実装

from concurrent import futures
from google.cloud import pubsub_v1

project_id = ""
topic_id = "test-topic"

# データ量に応じて調節する
batch_settings = pubsub_v1.types.BatchSettings(
    max_messages=1000,
    max_bytes=1024 * 1024,
    max_latency=10,
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
sample_messages = []
# generate publish data
for i in range(1, 10001):
  sample_message = f"Sample Message {i}"
  sample_messages.append(sample_message)

loop_count = 0
publish_futures = []
print("Start publishing message")
for sample_message in sample_messages:
  message = sample_message.encode("utf-8")
  future = publisher.publish(topic_path, message)
  publish_futures.append(future)
  if len(publish_futures) >= 1000:
    futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
    publish_futures = []

  loop_count += 1
  if loop_count % 1000 == 0:
    print(f"publish progress: {loop_count}")

print("Messages published")

仕組みについて

より多くのメッセージをPublishするためPythonの標準ライブラリであるconcurrent.futuresを利用します。Pub/Sub Clientのpublishメソッドは返り値にfutureオブジェクトを返します。futuresオブジェクトの監視を行うため、その後publish_futuresというリストに返されたfutureオブジェクトをappendします。ある程度publish_futuresリスト内にfutureオブジェクトが溜まってきたら、futures.waitメソッドを呼び出し、publish_futuresリスト内に存在するすべてのfutureオブジェクトが完了するのを待ちます。 上記のロジックにすることでfutureオブジェクトを任意の量生成してメッセージをまとめてpublishできます。

まとめ

futuresオブジェクトをうまく利用しメッセージを短時間でPublishするようなロジックを作成できました。あくまで実装方法の一つなので他に良い実装方法を知っている方は是非教えて下さい!