エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

BigQuery から高速にデータを持ってきてシームレスに Kotlin data class で使いたい

この記事は基盤開発チームのテックブログリレー3日目の記事です。 (2日目の記事は、田尻さんの「未来に続く、これからの非同期処理 ~ Algebraic Effect and Handlers ~ - エムスリーテックブログ」でした)

こんにちは、基盤開発チーム/Unit3を兼務している林です。

最近の仕事で、AWS環境のKotlinアプリケーションでBigQueryにあるデータを高速処理したくて、BigQuery Storage Read APIを使ったうえでKotlin data classへの変換処理を実装しました。

「簡単に見える処理でも色々考えることがあるなあ」ということをあらためて実感したので、それを具体的な事例として書き残しておきます。

実現したかったこと

今回やりたかったことは、日本語で簡単に記述できます。 「自由に書いたSQLクエリをBigQueryで実行した結果を、高速に、Kotlinで使いやすい形で取得したい」

もうちょっと具体的にKotlinコードで書いてみると、次のようにBigQueryのSQL実行結果を、そのままdata classで受け取って処理できるようにするというだけです。ただし、できる限りに効率的にやりたい。

data class User(
  val age: Long,
  val name: String
)
val user: Flow<User> = bq.query("SELECT age, name FROM project.dataset.user")
// 大量のデータを扱いたいのでストリーム処理ができるFlowにしていますが、Sequenceなどでもよいです

前提

今回は、BigQueryのそこそこ大きなクエリ実行結果(100MB以上)を、AWS環境のKotlinアプリケーションに高速に持ってきてシームレスに使えるようにした話です。

BigQueryは内部のクエリ実行は速いですが、Google Cloudの外にクエリ結果を持っていこうとしたときに、普通にやるとかなり遅くなります。 環境にもよると思いますが、通常のBigQuery APIで同じデータサイズでやってみると、300秒以上かかりました。

そこで、Google Cloud外に高速にデータ転送するために、BigQuery Storage Read API(以下、Storage Read)が用意されています。 Storage Read APIを使うには、ストリーム並列処理やストリーム内のチャンクの処理などが必要になり、その対応をしたライブラリを使うか、無ければある程度自前で実装する必要があります。*1

Storage Readでのデータ転送形式として、Apache Avro(行指向)とApache Arrow(列指向)の2種類があります。 今回は、クエリ結果の行をKotlin data classにシームレスに変換したいので、1行ずつ処理できるAvro形式を選択し、BigQuery Storage Readでクエリ結果を高速にダウンロードし、data classで使いやすくなるようにしました。

実際にやったこと - 数十倍の速度で処理可能に

やったことは、BigQuery Storage Read APIを使うことと、そこから送られてくるAvroデータのデコード、そしてKotlin data classへの変換処理です。

  • Java の BigQuery Storage Read クライアントを使って、Storage Read APIでストリーム並列処理
  • Java の Avroデコードライブラリを使って、GenericDatumReaderでAvroレコードに変換した後に、リフレクションでdata classに変換
  • Avroデコード処理のパフォーマンスチューニング(オブジェクトやリフレクション情報をできるだけキャッシュしたり、ナノ秒レベルの改善と積み重ねて、1レコード100ナノ秒程度でAvroバイナリからdata class変換できるように)

ここまでやって、性能的には1000万行(100MB)のクエリ結果を1〜2秒でロードしてdata classで使えるようになりました。 その後、「ORDER BYの際の順序保証の懸念」(後述)があることが分かり、ストリーム並列を一旦断念したので、最終的には、1000万行(100MB)を10秒程度で処理可能というレベルになりました。

Kotlin data classへの変換処理の実装

今回Avroバイナリデータのデコード処理に、JavaのAvroライブラリを使っています。 Avroバイナリデータをデコードするときに、(ReflectDatumReader)ReflectDatumReader (Apache Avro Java 1.7.4 API)という、リフレクションによりJavaオブジェクトに変換してくれるものがあり、検討した結果採用しませんでした。

まず、Javaのライブラリなので、KotlinのNullableオブジェクトを正しく認識せず、自前でKotlinリフレクションを使ってNullable判定をしてAvroスキーマ補正をしないと動きませんでした。 また、リフレクション処理の中でデフォルトコンストラクタが使われるので、data classに引数なしのコンストラクタを別途つけるか、全部にデフォルト引数を付ける必要もありました。 不要な定義を書くのも避けたいですが、本来は必要なデータの値が全てある状態でのみ作られるdata classが、不完全な状態で作れてしまうのもよくないです。

そこで今回は、ReflectDatumReaderではなく、リフレクション処理をしないもっとも基本的なGenericDatumReaderを使ってAvroレコードオブジェクトに変換した後に、自前でKotlin data class専用のリフレクション処理を実装しました。

Avroバイナリからdata classへの変換処理を書いてチューニングしていった結果、ReflectDatumReaderを使ったものよりもシングルスレッドでは2倍程度、マルチスレッドでは8倍程度高速になりました。

ライブラリのリフレクション機能を採用しなかったことで結果的に、Kotlinで扱ううえでの機能性も性能もどちらも向上しました。

やってみたが、最終的にやらなかったこと

ソフトウェア開発において、何を付け足したかは当然大事ですが、何をしなかったのかも重要です。 往々にして、検討したけどやらなかったことは、コメントやドキュメントやテックブログなどに書かれないことも多いです。 今回はあえてやらなかったことについて、いくつか言語化しておきます。

BigQueryのDATETIME型の取得をサポートしなかった

BigQuery Storage Readで送られてくるAvroデータは、物理的なデータ型の他に、LogicalTypeというものが設定されています。これのおかげで、内部表現としては整数でもDATE型と認識できます。 TIMESTAMP型は内部的に整数なので効率的ですが、DATETIMEはAvroデータの内部的には文字列型(string)ということが分かりました。*2

初めはDATETIME型の変換も実装したのですが、最終的にはDATETIME型は意図的にサポートしませんでした。

まず、性能の観点で、文字列型を扱うのは、整数に比べて、通信でもデコードでも非効率です。

TIMESTAMP型はUTCなのでタイムゾーンの問題はありませんが、DATETIME型はローカルタイムゾーンであり、そのデータの値だけを見て正しく扱うことが難しいです。 1サービスの中でなら統一できますが、BigQueryには他のサービスのデータもあり、ローカル時刻のローカルが指すものは絶対的ではありません。*3

クエリ対象データにDATETIME型があったとしても、データ取得クエリを書くときに、クエリを書く人が対象データのタイムゾーンについて知っているはずなので、適切にTIMESTAMP変換を明示すれば問題ありません。

DATETIME型を意図的にサポートしない判断により、効率性を守り、機能性は落とさず、デコーダの保守性も向上する良い判断だったと思っています。

BigQuery Storage Readの並列ストリームをやめた

並列ストリームの処理を記述しているときに、1つ懸念がありました。 クエリ結果を複数に分割して並列にダウンロードして、到着したものから処理ができるのであれば、ORDER BYで並べた結果の順序は保証されない可能性があります。

最初調査したときに、Google実装のBigQuery Pythonクライアントドキュメントで「ORDER BYクエリのときは、最大並列ストリーム数が1に調整される」との記述を見かけて、「BigQueryサーバー側で自動で調整してくれるのだな」と思い違いをしたのですが、そうではなく(サーバー側ではなく)、そのクライアント実装においてORDER BY判定とストリーム数調整していたのでした。

そのクライアント実装の順序判定を見たところ、「ORDER BY」の文字列があったときは保守的に順序がある可能性がある、という判定をしていました。 そのナイーブな判定方法をプロダクトに採用するのは少し不安があったので、今回は、一旦並列ストリームを使わない判断にしました。*4

おわりに

今回実装する前にチームに相談したときに、チームリーダーの横本さんから「実装したら社内事例にもなるし、やってみてもいいのでは」というようにゆるく背中を押されたこともあり、チャレンジしてみて良かったです。

学びもありましたし、実際に便利に高速に処理できるので、今後長く使える実装になると嬉しいです。

We are hiring !!

エムスリーエンジニアグループの基盤開発チームでは、日々チャレンジしながら社内の基盤的プロダクトの開発をする仲間を募集しています!

まずはカジュアル面談から、以下URLよりご応募をお待ちしています。

jobs.m3.com

*1:たとえばPythonのpandasでは簡単にBigQuery Storage Readを使うことができますが、どの言語でも簡単に使えるわけではないようです。今回もApache Beamを使えば簡単にできるのでは、と思い試しましたが、Beam用の特殊な実行環境を用意しない状態だと、Bigquery Storage Readを使っても、あまり性能が出ませんでした。

*2:BigQueryの型とAvro LogicalTypeの対応 https://cloud.google.com/bigquery/docs/reference/storage#avro_schema

*3:弊テックブログの過去記事でも、時刻データの取扱について書かれたものがいくつかありますが、時刻データはそれだけよく使う上に扱いが難しいということですね。 タイムゾーンを考慮した日時の扱いのベストプラクティス - エムスリーテックブログ 9時間足すんだっけ引くんだっけ問題~あるいは、諸プログラミング言語はいかにタイムゾーンと向き合っているか - エムスリーテックブログ

*4:将来的に、たとえば、クエリのDry Run実行時に「このクエリは順序があります」などの情報を返してくれるようになったら、安全に順序判定ができてストリーム数調整ができる可能性があると思います。