Yann Moisan
Yann Moisan

Follow

May 29, 2018 · 9 min read

Spark is the core component of Teads’s Machine Learning stack. We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. また、Sparkを使用して、cassandraにデータをロードするためのクロスデバイスセグメント拡張やParquetからSSTablesへの変換などの集中的なジョブを処理します。

Sparkでの作業では、メモリ、ディスク、CPUの点でクラスタのリソースの限界に定期的に達します。 スケールアウトは問題を押し戻すだけなので、手を汚さなければなりません。

以下は、パフォーマンスの向上とよりクリーンなSparkコードを達成するためのSpark2.2.0のベストプラクティスと最適化のヒントのコレクシ:

  • タングステンを活用する方法、
  • 実行計画分析、
  • データ管理(キャッシュ、放送)、
  • クラウド関連の最適化(S3を含む)。

更新07/12/2018、トラブルシューティングのトリックと外部データソース管理をカバーする第二部も参照してください。常識ですが、コードのパフォーマンスを向上させる最良の方法は、Sparkの強みを取り入れることです。 そのうちの一つはタングステンです。

Standardバージョン1.5以降、Tungstenはspark SQLコンポーネントであり、実行時にバイトコードでSpark操作を書き換えることによってパフォーマンスを向上させます。 タングステンは、仮想機能を抑制し、ジョブCPUとメモリ効率に焦点を当てて、ベアメタル性能に近い活用しています。

Tungstenを最大限に活用するために、以下に注意を払っています。

DataFramesではなくDataset構造を使用してください

コードがTungstenの最適化から可能な限り利益を得られるようにするために、(RDDではなく)ScalaでデフォルトのDataset APIを使用しています。

Datasetは、リレーショナル(DataFrame)変換と機能(RDD)変換の組み合わせで、両方の長所をもたらします。 このAPIは最新のものであり、より良いエラー処理とはるかに読みやすい単体テストと一緒に型安全性を追加します。しかし、map関数とfilter関数がこのAPIでは貧弱に実行されるため、トレードオフが付属しています。 フレームレスは、この制限に取り組むための有望な解決策です。

ユーザー定義関数(Udf)をできるだけ避ける

udfを使用すると、従来のScalaでデータを処理してから再シリアル化するための逆シリアル化が意味されます。 UdfはSpark SQL関数に置き換えることができますが、すでに多くのものがあり、新しいものが定期的に追加されています。

Udfを回避すると、すぐには改善されないかもしれませんが、コードが変更された場合、少なくとも将来のパフォーマンスの問題を防ぐことができます。 また、組み込みのSpark SQL関数を使用することで、すべてがSpark側で実行されるため、テストの労力を削減しました。 これらの関数はJVMの専門家によって設計されているため、Udfはパフォーマンスを向上させる可能性はありません。たとえば、次のコードは組み込みのcoalesce関数で置き換えることができます。

:組み込みの置換がない場合でも、Catalyst(SparkのSQL optimizer)式クラスを実装および拡張することは可能です。 それはコード生成でうまくいくでしょう。 詳細については、Chris Freglyがここで話しました(スライド56参照)。 これを行うことにより、我々は直接タングステン形式にアクセスし、それが直列化の問題を解決し、性能をバンプします。

ユーザー定義集計関数(UDAFs)を避ける

UDAFは、HashAggregateよりも大幅に遅いSortAggregate操作を生成します。 たとえば、中央値を計算するUDAFを記述する代わりに、組み込みの同等の(quantile0,5)を使用します。

df.stat.approxQuantile("value”, Array(0.5), 0)

approxQuantile関数は、Greenwald-Khannaアルゴリズムのバリエーション 私たちの場合、同等のUDAFよりも10倍高速になりました。

複数のことを実行するUdfやUdafを避ける

ソフトウェアクラフトマンシップの原則は、ビッグデータのものを書くときに明らかに適用されま Udfを分割することで、結果のコードの一部に組み込み関数を使用できます。 それはまたテストを非常に簡単にします。

2-フードの下を見る

Sparkの実行計画を分析することは、潜在的な改善を見つける簡単な方法です。 この計画は、Sparkの物理的な実行単位であるステージで構成されています。 コードをリファクタリングするとき、最初に探すのは異常な数のステージです。 疑わしい計画は、2つのデータフレーム間の基本的な結合操作のために2-3ではなく10段階を必要とする計画である可能性があります。Spark、より一般的には分散コンピューティングでは、ネットワーク経由でデータを送信する(SparkではShuffleとも呼ばれる)のが最も高価なアクションです。 シャッフルは、ディスクI/O、データシリアル化、およびネットワークI/Oを必要とするため、高価です。JoinやgroupByなどの操作に必要であり、ステージ間で発生します。

これを考慮すると、ステージ数を減らすことは、ジョブを最適化するための明白な方法です。 私たちは、使用します.explain(true)コマンドは、ジョブに関連するすべてのステップ(ステージ)の詳細を示す実行計画を表示します。 ここに例があります:

Simple execution plan example

The Directed Acyclic Graph (DAG) in Spark UI can also be used to visualize the task repartition in each stage.div>

最適化は、データとその処理に関する知識の両方に大きく依存しています(税込。 ビジネスロジック)。 Catalystを使用したSpark SQL最適化の制限の1つは、実行計画を最適化するために「メカニック」ルールを使用することです(2.2.0で)。

他の多くの人と同様に、ブロードキャスト結合の選択を超えたコストベースの最適化エンジンを待っていました。 それは今2.3.0で利用可能なようです、我々はそれを見なければならないでしょう。

3-データを知り、効率的に管理する

実行計画を調べてジョブのパフォーマンスを向上させる方法を見てきましたが、データ側には多くの拡張

非常に不均衡なデータセット

すべてがokかどうかをすばやく確認するために、各タスクの実行時間を確認し、異種プロセス時間を探します。 いずれかのタスクが他のタスクよりも大幅に遅い場合は、全体的なジョブ期間が延長され、最速のエグゼキュータのリソースが無駄になります。Spark UIで最小、最大、中央値の期間を確認するのはかなり簡単です。 バランスの取れた例は次のとおりです。

コメントを残す

メールアドレスが公開されることはありません。