データ

Apache Sparkで週300億件の速度予測を計算

見出し

これはレイアウト確認用のダミーテキストです。

Mapboxトラフィックデータの開発者向けブログ

Mapboxトラフィックデータの開発者向けブログ

Mapboxは、モバイル(iOSおよびAndroid)SDKから、毎日3億マイルを超える匿名化された位置データを収集しています。このデータを使用して、過去の観測から生成された特定の時間と道路の速度予測を計算し、次のような質問に答えます。SFのマーケットストリートの金曜日の正午の予想速度は?

この記事では、Apache Sparkを使用して、世界の道路網全体の1週間あたり300億件の速度予測をどのように計算できるかについて説明します。

速度予測の計算

当社のSDKから収集されたテレメトリーイベントは、匿名化され、プライバシーフィルター処理され、経度や緯度などの座標情報を含むトレースにチェーン化されます。最終的に、距離、期間、速度、およびヘディング情報は、連続する座標から導き出され、速度プローブと呼ばれます。

トレースから生成されたプローブは、全世界の道路網全体と照合されます。マッチング処理の最後に、各トレースに平均速度、5分間の時間帯、および道路セグメントを割り当てることができます。同じ5分間の時間帯に該当する同じ道路上のマッチは集計され、速度ヒストグラムが作成されます。最後に、集計された各ヒストグラムの速度を推定します。これは、特定の曜日の特定の時間にドライバーが道路で経験するであろう予測を表しています。

非常に多くのデータ

すべてのテレメトリートレースを世界中の道路ネットワーク全体と照合し、過去の観測を集計して、曜日ごとに5分間隔で道路ごとの速度推定値を取得します。おそらく、「うわー、それは処理するのに大量のデータが必要だろうけど、実際にはどれくらいなの?」と思っていることでしょう。

週単位では、平均して22億のトレース23億の道路に照合して、54億のマッチングを生成します。マッチングから、510億の速度ヒストグラムを作成し、最終的に300億の速度予測を生成します。

データのサイズ、変換と計算の複雑さに基づいて、pySparkの実装は非常に理にかなっていました。Sparkは、大規模な分散コンピューティングのためのフレームワークを提供し、大規模なデータセットの高速処理を可能にするためです。

データ処理パイプラインの設計

最初に時間を費やしたのは、パイプラインと、それが生成するすべての異なるデータセットのスキーマを設計することでした。私たちのパイプラインでは、各pySparkアプリケーションは、ダウンストリームアプリケーションが使用するためにすぐに利用できるHiveテーブルに保持されたデータセットを生成します。

1つのpySparkアプリケーションがすべてのステップ(マップマッチング、集計、速度推定など)を実行する代わりに、各ステップを個別のアプリケーションに分離しました。そうすることで、個々のアプリケーションごとにデータセットフィクスチャをモックすることができ、チーム全体での初期開発を迅速化し、分散させることができました。また、実際のプロダクションデータに対する複雑な変換の結果をテストおよび評価することも可能になりました。最後に、中間データセットを使用することで、データサイエンティストはパイプラインのさまざまなコンポーネントでモデル実験を実行できます。

関連テーブル間の関係を通じて最終的なトラフィックプロファイルデータセットに到達するために、可能な限りテーブルを正規化することを優先しました。正規化により、テーブルスキーマをデータセットを生成するアプリケーションで定義でき、データの整合性を維持し、データの冗長性を排除できます。当然のことながら、結合などの変換が法外に高価になった場合に非正規化するという選択肢も念頭に置いていました。

当社のパイプライン:

データパーティショニング

パーティショニングにより、データの一部に対するクエリがより速く、より簡単になります。結果として得られるすべてのデータセットは、時間的および空間的次元の両方でパーティション化します。

Airflowを使用すると、空間パーティション分割をパイプラインオーケストレーションに簡単に引き継ぎ、「全世界」を一度に処理する代わりに、パーティションごとにパイプラインを実行できます。これにより、各パイプライン内のデータサイズが削減され、スケーラビリティが向上します。スパースパーティションを選択してパイプライン全体をテストできるため、開発、反復、および本番データに対する頻繁なテストを迅速に行うことができます。

データの偏り

データがパーティションまたはキー全体に均等に分散されていない場合、データは偏った状態になります。これは、特定の地理的な場所に他の場所よりも常に多くのデータが存在するため、テレメトリデータの一般的な特性です。

結合のような変換を実行する場合、Spark はパーティション式を評価してデータが同じ場所に配置されます。データが不均等に分散された変数でキー付けされている場合、いくつかの非常に大きなパーティションが発生する可能性があります。

これは処理時に問題となります。なぜなら、Sparkはパーティションごとに1つのタスクを割り当てるからです。したがって、非常に大きなパーティションが1つある場合、そのパーティションを処理するタスクは時間の大部分を占め、他のタスクの90%以上はすぐに完了します。明らかに、これは分散処理の目的を損ない、リソースを浪費します。結局のところ、実行時間がほぼ同じになる妥当な数のタスクが必要になります。

データスキューを軽減するための複数の戦略を検討しました。

  • パーティション数の増加: repartitionを使用してデータフレームのパーティション数を単純に増やしたり、spark.sql.shuffleを設定したりすると、データが極端に偏っていない場合に役立ちます。
  • 新しい一意のIDを作成する:一意のID列を追加して再パーティション化すると、ハッシュパーティショナーが偏った変数とは無関係に各行をパーティションに割り当てるため、バランスの取れたパーティションが作成されます。


uuid_udf = udf(lambda: uuid_generator(), StringType())
df = skewed_df.withColumn("u_id", uuid_udf())\
.repartition(num_partitions, 'u_id')

  • 偏ったキーをソルトする:偏ったキーに対する結合のような変換を実行する必要がある場合、キーにランダム化を追加すると、より均等に分散されます。許容できるバッチサイズを定義し、バッチ内のすべてのキーを同じランダムな整数でソルトすることから始めます。事実上、大きなパーティションを均等に分散された小さなグループに分割します。

結論

当社のpySparkパイプラインは、1日に数十億行のデータを処理し、モデルの反復、改善、変更の迅速な評価を可能にします。

この規模のデータを使用したプロジェクトでSparkを扱うには、Sparkの内部構造と、基盤となるデータがパフォーマンスにどのように大きな影響を与えるかを深く理解する必要がありました。

これはレイアウト確認用のダミーテキストです。

これはレイアウト確認用のダミーテキストです。

関連記事